#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
COM28串口实际测试脚本 - 接收GPS和IMU数据
|
测试STM32 PythonLink模块的数据输出
|
"""
|
|
import serial
|
import struct
|
import time
|
from datetime import datetime
|
|
# 帧格式常量
|
FRAME_HEADER = b'\xAA\x55'
|
FRAME_FOOTER = b'\x0D\x0A'
|
|
# 数据类型
|
TYPE_GPS = 0x01
|
TYPE_IMU = 0x02
|
|
# 数据包大小
|
GPS_STRUCT_SIZE = 44 # 2*double + 5*float + uint32 + 2*uint8 + 2 padding
|
IMU_STRUCT_SIZE = 32 # 7*float + uint32
|
|
# 数据结构格式(小端)
|
GPS_STRUCT_FMT = '<dd5fIBB2x' # latitude, longitude, heading, east_vel, north_vel, up_vel, altitude, timestamp, satellites, fix_quality, padding
|
IMU_STRUCT_FMT = '<7fI' # accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, temp, timestamp
|
|
class SerialReceiver:
|
def __init__(self, port='COM28', baudrate=921600):
|
"""初始化串口接收器"""
|
self.port = port
|
self.baudrate = baudrate
|
self.serial = None
|
self.buffer = bytearray()
|
|
# 统计信息
|
self.gps_count = 0
|
self.imu_count = 0
|
self.error_count = 0
|
self.start_time = None
|
|
def connect(self):
|
"""连接串口"""
|
try:
|
self.serial = serial.Serial(
|
port=self.port,
|
baudrate=self.baudrate,
|
bytesize=serial.EIGHTBITS,
|
parity=serial.PARITY_NONE,
|
stopbits=serial.STOPBITS_ONE,
|
timeout=0.1
|
)
|
print(f"✓ 成功连接到 {self.port} @ {self.baudrate} bps")
|
self.start_time = time.time()
|
return True
|
except Exception as e:
|
print(f"✗ 串口连接失败: {e}")
|
return False
|
|
def disconnect(self):
|
"""断开串口"""
|
if self.serial and self.serial.is_open:
|
self.serial.close()
|
print(f"\n串口已关闭")
|
|
def calculate_checksum(self, data):
|
"""计算16位累加和校验"""
|
checksum = sum(data) & 0xFFFF
|
return checksum
|
|
def parse_gps_data(self, payload, timestamp_ms):
|
"""解析GPS数据包"""
|
try:
|
data = struct.unpack(GPS_STRUCT_FMT, payload)
|
latitude, longitude, heading, east_vel, north_vel, up_vel, altitude, timestamp, satellites, fix_quality = data
|
|
print(f"\n[GPS #{self.gps_count}] {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
|
print(f" 位置: 纬度 {latitude:.8f}°, 经度 {longitude:.8f}°")
|
print(f" 航向角: {heading:.2f}°")
|
print(f" 速度: 东 {east_vel:.3f} m/s, 北 {north_vel:.3f} m/s, 天 {up_vel:.3f} m/s")
|
print(f" 海拔: {altitude:.2f} m")
|
print(f" 卫星数: {satellites}, 定位质量: {fix_quality}")
|
print(f" 时间戳: {timestamp} ms")
|
|
return True
|
except Exception as e:
|
print(f"✗ GPS数据解析失败: {e}")
|
return False
|
|
def parse_imu_data(self, payload, timestamp_ms):
|
"""解析IMU数据包"""
|
try:
|
data = struct.unpack(IMU_STRUCT_FMT, payload)
|
accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, temp, timestamp = data
|
|
print(f"\n[IMU #{self.imu_count}] {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
|
print(f" 加速度: X {accel_x:+.3f} g, Y {accel_y:+.3f} g, Z {accel_z:+.3f} g")
|
print(f" 角速度: X {gyro_x:+.3f} °/s, Y {gyro_y:+.3f} °/s, Z {gyro_z:+.3f} °/s")
|
print(f" 温度: {temp:.1f} °C")
|
print(f" 时间戳: {timestamp} ms")
|
|
return True
|
except Exception as e:
|
print(f"✗ IMU数据解析失败: {e}")
|
return False
|
|
def process_frame(self, frame_data):
|
"""处理一个完整的帧"""
|
if len(frame_data) < 9: # 至少: header(2) + type(1) + len(2) + checksum(2) + footer(2)
|
return
|
|
# 解析帧头
|
if frame_data[:2] != FRAME_HEADER:
|
return
|
|
frame_type = frame_data[2]
|
payload_len = struct.unpack('<H', frame_data[3:5])[0]
|
|
# 检查帧长度
|
expected_len = 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer
|
if len(frame_data) < expected_len:
|
return
|
|
# 提取数据
|
payload = frame_data[5:5+payload_len]
|
checksum_received = struct.unpack('<H', frame_data[5+payload_len:5+payload_len+2])[0]
|
footer = frame_data[5+payload_len+2:5+payload_len+4]
|
|
# 验证帧尾
|
if footer != FRAME_FOOTER:
|
print(f"✗ 帧尾错误: {footer.hex()}")
|
self.error_count += 1
|
return
|
|
# 验证校验和
|
checksum_data = frame_data[2:5+payload_len] # type + len + payload
|
checksum_calc = self.calculate_checksum(checksum_data)
|
|
if checksum_received != checksum_calc:
|
print(f"✗ 校验和错误: 接收 0x{checksum_received:04X}, 计算 0x{checksum_calc:04X}")
|
self.error_count += 1
|
return
|
|
# 根据类型解析数据
|
timestamp_ms = int(time.time() * 1000)
|
|
if frame_type == TYPE_GPS:
|
self.gps_count += 1
|
if payload_len == GPS_STRUCT_SIZE:
|
self.parse_gps_data(payload, timestamp_ms)
|
else:
|
print(f"✗ GPS数据长度错误: 期望 {GPS_STRUCT_SIZE}, 实际 {payload_len}")
|
self.error_count += 1
|
|
elif frame_type == TYPE_IMU:
|
self.imu_count += 1
|
if payload_len == IMU_STRUCT_SIZE:
|
self.parse_imu_data(payload, timestamp_ms)
|
else:
|
print(f"✗ IMU数据长度错误: 期望 {IMU_STRUCT_SIZE}, 实际 {payload_len}")
|
self.error_count += 1
|
else:
|
print(f"✗ 未知帧类型: 0x{frame_type:02X}")
|
self.error_count += 1
|
|
def find_and_process_frames(self):
|
"""在缓冲区中查找并处理完整的帧"""
|
while True:
|
# 查找帧头
|
header_pos = self.buffer.find(FRAME_HEADER)
|
if header_pos == -1:
|
# 没有找到帧头,保留最后2个字节(可能是跨缓冲区的帧头)
|
if len(self.buffer) > 2:
|
self.buffer = self.buffer[-2:]
|
break
|
|
# 丢弃帧头之前的数据
|
if header_pos > 0:
|
self.buffer = self.buffer[header_pos:]
|
|
# 检查是否有足够的数据解析帧头
|
if len(self.buffer) < 5: # header(2) + type(1) + len(2)
|
break
|
|
# 解析负载长度
|
payload_len = struct.unpack('<H', self.buffer[3:5])[0]
|
|
# 检查长度是否合理
|
if payload_len > 1024: # 防止异常大的负载
|
print(f"✗ 异常的负载长度: {payload_len}")
|
self.buffer = self.buffer[2:] # 跳过这个假帧头
|
self.error_count += 1
|
continue
|
|
# 计算完整帧长度
|
frame_len = 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer
|
|
# 检查是否收到完整的帧
|
if len(self.buffer) < frame_len:
|
break
|
|
# 提取完整帧并处理
|
frame_data = bytes(self.buffer[:frame_len])
|
self.process_frame(frame_data)
|
|
# 从缓冲区移除已处理的帧
|
self.buffer = self.buffer[frame_len:]
|
|
def print_statistics(self):
|
"""打印统计信息"""
|
if self.start_time:
|
elapsed = time.time() - self.start_time
|
print(f"\n{'='*60}")
|
print(f"运行时间: {elapsed:.1f} 秒")
|
print(f"GPS数据包: {self.gps_count} 个 ({self.gps_count/elapsed:.1f} Hz)")
|
print(f"IMU数据包: {self.imu_count} 个 ({self.imu_count/elapsed:.1f} Hz)")
|
print(f"错误计数: {self.error_count} 个")
|
print(f"{'='*60}")
|
|
def run(self, duration=None):
|
"""运行接收循环"""
|
print(f"\n开始接收数据... (按 Ctrl+C 停止)")
|
print(f"{'='*60}")
|
|
start_time = time.time()
|
last_stats_time = start_time
|
|
try:
|
while True:
|
# 读取串口数据
|
if self.serial.in_waiting > 0:
|
data = self.serial.read(self.serial.in_waiting)
|
self.buffer.extend(data)
|
|
# 处理缓冲区中的帧
|
self.find_and_process_frames()
|
|
# 定期打印统计信息
|
current_time = time.time()
|
if current_time - last_stats_time >= 10.0: # 每10秒
|
self.print_statistics()
|
last_stats_time = current_time
|
|
# 检查是否达到指定运行时长
|
if duration and (current_time - start_time) >= duration:
|
break
|
|
time.sleep(0.001) # 短暂休眠,避免CPU占用过高
|
|
except KeyboardInterrupt:
|
print("\n\n用户中断")
|
finally:
|
self.print_statistics()
|
self.disconnect()
|
|
|
def main():
|
"""主函数"""
|
print("="*60)
|
print("STM32 GPS/IMU 数据接收测试")
|
print("串口: COM28 @ 921600 bps")
|
print("="*60)
|
|
receiver = SerialReceiver(port='COM28', baudrate=921600)
|
|
if receiver.connect():
|
# 运行接收循环(无限期,直到按Ctrl+C)
|
# 也可以指定运行时长,例如: receiver.run(duration=60) 运行60秒
|
receiver.run()
|
else:
|
print("无法启动测试")
|
|
|
if __name__ == '__main__':
|
main()
|