yincheng.zhong
7 天以前 b53fff11e6f0d560594834de32886239cbba90a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
校准数据记录工具
从串口读取$CAL和$DEBUG数据并保存到日志文件
 
$CAL格式:seq,time_ms,state,throttle,steering,enu_x,enu_y,enu_z,hdg,pitch,roll,gx,gy,gz,ax,ay,az
$DEBUG格式:ENU,x,y,z,HDG,angle,ACC,ax,ay,az,GYRO,gx,gy,gz
"""
 
import argparse
import datetime
import serial
import sys
from pathlib import Path
 
 
def main():
    parser = argparse.ArgumentParser(description='校准数据记录工具')
    parser.add_argument('-p', '--port', type=str, required=True,
                       help='串口号 (例如: COM5 或 /dev/ttyUSB0)')
    parser.add_argument('-b', '--baudrate', type=int, default=115200,
                       help='波特率 (默认: 115200)')
    parser.add_argument('-o', '--output', type=str, default=None,
                       help='输出文件路径 (默认: calibration_YYYYMMDD_HHMMSS.log)')
    
    args = parser.parse_args()
    
    # 生成输出文件名
    if args.output is None:
        timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
        output_file = Path(f'calibration_{timestamp}.log')
    else:
        output_file = Path(args.output)
    
    print("=" * 60)
    print("校准数据记录工具")
    print("=" * 60)
    print(f"串口: {args.port}")
    print(f"波特率: {args.baudrate}")
    print(f"输出文件: {output_file.absolute()}")
    print("=" * 60)
    print("按 Ctrl+C 停止记录")
    print("=" * 60)
    
    try:
        # 打开串口
        ser = serial.Serial(args.port, args.baudrate, timeout=1)
        print(f"串口已打开: {args.port}")
        
        # 打开输出文件
        with open(output_file, 'w', encoding='utf-8') as f:
            line_count = 0
            cal_count = 0
            
            while True:
                try:
                    # 读取一行
                    line = ser.readline()
                    
                    if line:
                        try:
                            line_str = line.decode('utf-8', errors='ignore').strip()
                        except:
                            continue
                        
                        if not line_str:
                            continue
                        
                        # 写入文件
                        f.write(line_str + '\n')
                        f.flush()
                        line_count += 1
                        
                        # 统计$CAL行
                        if line_str.startswith('$CAL'):
                            cal_count += 1
                            # 只打印$CAL行
                            print(f"[{cal_count:04d}] {line_str}")
                        elif line_str.startswith('$DEBUG'):
                            # 打印调试信息(ENU坐标、加速度、角速度)
                            print(f"[DEBUG] {line_str}")
                        else:
                            # 其他行(如调试信息)也记录,但不打印到屏幕
                            pass
                        
                        # 检测结束标志
                        if 'FINISHED' in line_str or 'EMERGENCY' in line_str:
                            print("\n" + "=" * 60)
                            print("检测到测试结束标志")
                            print("=" * 60)
                            break
                
                except KeyboardInterrupt:
                    print("\n用户中断")
                    break
                except Exception as e:
                    print(f"\n错误: {e}")
                    continue
        
        print("\n" + "=" * 60)
        print("记录完成")
        print(f"总行数: {line_count}")
        print(f"$CAL数据行数: {cal_count}")
        print(f"输出文件: {output_file.absolute()}")
        print("=" * 60)
        
        if cal_count > 0:
            print("\n下一步:")
            print(f"  python tools/calibration_analyzer.py {output_file}")
        
    except serial.SerialException as e:
        print(f"错误: 无法打开串口 {args.port}: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"错误: {e}")
        sys.exit(1)
    finally:
        if 'ser' in locals() and ser.is_open:
            ser.close()
            print("串口已关闭")
 
 
if __name__ == '__main__':
    main()