#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
校准数据记录工具
|
从串口读取$CAL和$DEBUG数据并保存到日志文件
|
|
$CAL格式:seq,time_ms,state,throttle,steering,enu_x,enu_y,enu_z,hdg,pitch,roll,gx,gy,gz,ax,ay,az
|
$DEBUG格式:ENU,x,y,z,HDG,angle,ACC,ax,ay,az,GYRO,gx,gy,gz
|
"""
|
|
import argparse
|
import datetime
|
import serial
|
import sys
|
from pathlib import Path
|
|
|
def main():
|
parser = argparse.ArgumentParser(description='校准数据记录工具')
|
parser.add_argument('-p', '--port', type=str, required=True,
|
help='串口号 (例如: COM5 或 /dev/ttyUSB0)')
|
parser.add_argument('-b', '--baudrate', type=int, default=115200,
|
help='波特率 (默认: 115200)')
|
parser.add_argument('-o', '--output', type=str, default=None,
|
help='输出文件路径 (默认: calibration_YYYYMMDD_HHMMSS.log)')
|
|
args = parser.parse_args()
|
|
# 生成输出文件名
|
if args.output is None:
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
output_file = Path(f'calibration_{timestamp}.log')
|
else:
|
output_file = Path(args.output)
|
|
print("=" * 60)
|
print("校准数据记录工具")
|
print("=" * 60)
|
print(f"串口: {args.port}")
|
print(f"波特率: {args.baudrate}")
|
print(f"输出文件: {output_file.absolute()}")
|
print("=" * 60)
|
print("按 Ctrl+C 停止记录")
|
print("=" * 60)
|
|
try:
|
# 打开串口
|
ser = serial.Serial(args.port, args.baudrate, timeout=1)
|
print(f"串口已打开: {args.port}")
|
|
# 打开输出文件
|
with open(output_file, 'w', encoding='utf-8') as f:
|
line_count = 0
|
cal_count = 0
|
|
while True:
|
try:
|
# 读取一行
|
line = ser.readline()
|
|
if line:
|
try:
|
line_str = line.decode('utf-8', errors='ignore').strip()
|
except:
|
continue
|
|
if not line_str:
|
continue
|
|
# 写入文件
|
f.write(line_str + '\n')
|
f.flush()
|
line_count += 1
|
|
# 统计$CAL行
|
if line_str.startswith('$CAL'):
|
cal_count += 1
|
# 只打印$CAL行
|
print(f"[{cal_count:04d}] {line_str}")
|
elif line_str.startswith('$DEBUG'):
|
# 打印调试信息(ENU坐标、加速度、角速度)
|
print(f"[DEBUG] {line_str}")
|
else:
|
# 其他行(如调试信息)也记录,但不打印到屏幕
|
pass
|
|
# 检测结束标志
|
if 'FINISHED' in line_str or 'EMERGENCY' in line_str:
|
print("\n" + "=" * 60)
|
print("检测到测试结束标志")
|
print("=" * 60)
|
break
|
|
except KeyboardInterrupt:
|
print("\n用户中断")
|
break
|
except Exception as e:
|
print(f"\n错误: {e}")
|
continue
|
|
print("\n" + "=" * 60)
|
print("记录完成")
|
print(f"总行数: {line_count}")
|
print(f"$CAL数据行数: {cal_count}")
|
print(f"输出文件: {output_file.absolute()}")
|
print("=" * 60)
|
|
if cal_count > 0:
|
print("\n下一步:")
|
print(f" python tools/calibration_analyzer.py {output_file}")
|
|
except serial.SerialException as e:
|
print(f"错误: 无法打开串口 {args.port}: {e}")
|
sys.exit(1)
|
except Exception as e:
|
print(f"错误: {e}")
|
sys.exit(1)
|
finally:
|
if 'ser' in locals() and ser.is_open:
|
ser.close()
|
print("串口已关闭")
|
|
|
if __name__ == '__main__':
|
main()
|