#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
GPS/IMU 数据接收解析器
|
用于接收来自 STM32H7 的 GPS 和 IMU 数据包
|
协议格式: AA 55 TYPE LEN DATA CHECKSUM 0D 0A
|
"""
|
|
import serial
|
import struct
|
import time
|
from typing import Optional, Tuple
|
from dataclasses import dataclass
|
|
|
@dataclass
|
class GPSData:
|
"""GPS 数据包结构"""
|
latitude: float # 纬度(°)
|
longitude: float # 经度(°)
|
heading_angle: float # 航向角(°), 0~360
|
pitch_angle: float # 俯仰角(°)
|
roll_angle: float # 横滚角(°)
|
east_velocity: float # 东方向速度(m/s)
|
north_velocity: float # 北方向速度(m/s)
|
up_velocity: float # 天顶方向速度(m/s)
|
altitude: float # 高程(m)
|
utc_time: int # UTC时间, hhmmss格式
|
position_quality: int # 定位质量
|
satellite_count: int # 卫星数量
|
|
|
@dataclass
|
class IMUData:
|
"""IMU 数据包结构"""
|
accel_x: float # X轴加速度(g)
|
accel_y: float # Y轴加速度(g)
|
accel_z: float # Z轴加速度(g)
|
gyro_x: float # X轴角速度(°/s)
|
gyro_y: float # Y轴角速度(°/s)
|
gyro_z: float # Z轴角速度(°/s)
|
temperature: float # 传感器温度(℃)
|
utc_time: int # UTC时间(毫秒)
|
|
|
class GPSIMUReceiver:
|
"""GPS/IMU 数据接收解析器"""
|
|
# 协议常量
|
FRAME_HEADER1 = 0xAA
|
FRAME_HEADER2 = 0x55
|
FRAME_FOOTER1 = 0x0D
|
FRAME_FOOTER2 = 0x0A
|
|
TYPE_GPS = 0x01
|
TYPE_IMU = 0x02
|
|
# GPS数据包格式: 2个double + 7个float + 1个uint32 + 2个uint8 + 2字节对齐 = 52字节
|
# double(8) x2 + float(4) x7 + uint32(4) + uint8(1) x2 + padding(2)
|
GPS_STRUCT_FMT = '<dd7fIBB2x'
|
GPS_STRUCT_SIZE = 52
|
|
# IMU数据包格式: 7f I (32 bytes)
|
IMU_STRUCT_FMT = '<7f I'
|
IMU_STRUCT_SIZE = 32
|
|
def __init__(self, port: str, baudrate: int = 921600):
|
"""
|
初始化串口接收器
|
|
Args:
|
port: 串口设备名 (例如 'COM3' 或 '/dev/ttyUSB0')
|
baudrate: 波特率 (默认 921600)
|
"""
|
self.port = port
|
self.baudrate = baudrate
|
self.serial = None
|
|
# 统计计数
|
self.gps_count = 0
|
self.imu_count = 0
|
self.error_count = 0
|
|
def connect(self) -> bool:
|
"""
|
连接串口
|
|
Returns:
|
成功返回True, 失败返回False
|
"""
|
try:
|
self.serial = serial.Serial(
|
port=self.port,
|
baudrate=self.baudrate,
|
bytesize=serial.EIGHTBITS,
|
parity=serial.PARITY_NONE,
|
stopbits=serial.STOPBITS_ONE,
|
timeout=1.0
|
)
|
print(f"[INFO] 串口已连接: {self.port} @ {self.baudrate} bps")
|
return True
|
except Exception as e:
|
print(f"[ERROR] 串口连接失败: {e}")
|
return False
|
|
def disconnect(self):
|
"""断开串口连接"""
|
if self.serial and self.serial.is_open:
|
self.serial.close()
|
print("[INFO] 串口已断开")
|
|
def _find_frame_header(self) -> bool:
|
"""
|
查找帧头 AA 55
|
|
Returns:
|
找到返回True, 超时返回False
|
"""
|
ser = self.serial
|
if ser is None:
|
return False
|
|
byte1 = ser.read(1)
|
if len(byte1) == 0:
|
return False
|
|
while byte1[0] != self.FRAME_HEADER1:
|
byte1 = ser.read(1)
|
if len(byte1) == 0:
|
return False
|
|
byte2 = ser.read(1)
|
if len(byte2) == 0 or byte2[0] != self.FRAME_HEADER2:
|
return False
|
|
return True
|
|
def _parse_gps_data(self, payload: bytes) -> Optional[GPSData]:
|
"""
|
解析GPS数据包
|
|
Args:
|
payload: GPS数据负载
|
|
Returns:
|
解析成功返回GPSData对象, 失败返回None
|
"""
|
try:
|
if len(payload) != self.GPS_STRUCT_SIZE:
|
print(f"[WARN] GPS数据长度错误: {len(payload)} != {self.GPS_STRUCT_SIZE}")
|
return None
|
|
unpacked = struct.unpack(self.GPS_STRUCT_FMT, payload)
|
|
return GPSData(
|
latitude=unpacked[0],
|
longitude=unpacked[1],
|
heading_angle=unpacked[2],
|
pitch_angle=unpacked[3],
|
roll_angle=unpacked[4],
|
east_velocity=unpacked[5],
|
north_velocity=unpacked[6],
|
up_velocity=unpacked[7],
|
altitude=unpacked[8],
|
utc_time=unpacked[9],
|
position_quality=unpacked[10],
|
satellite_count=unpacked[11]
|
)
|
except Exception as e:
|
print(f"[ERROR] GPS数据解析失败: {e}")
|
return None
|
|
def _parse_imu_data(self, payload: bytes) -> Optional[IMUData]:
|
"""
|
解析IMU数据包
|
|
Args:
|
payload: IMU数据负载
|
|
Returns:
|
解析成功返回IMUData对象, 失败返回None
|
"""
|
try:
|
if len(payload) != self.IMU_STRUCT_SIZE:
|
print(f"[WARN] IMU数据长度错误: {len(payload)} != {self.IMU_STRUCT_SIZE}")
|
return None
|
|
unpacked = struct.unpack(self.IMU_STRUCT_FMT, payload)
|
|
return IMUData(
|
accel_x=unpacked[0],
|
accel_y=unpacked[1],
|
accel_z=unpacked[2],
|
gyro_x=unpacked[3],
|
gyro_y=unpacked[4],
|
gyro_z=unpacked[5],
|
temperature=unpacked[6],
|
utc_time=unpacked[7]
|
)
|
except Exception as e:
|
print(f"[ERROR] IMU数据解析失败: {e}")
|
return None
|
|
def receive_packet(self) -> Tuple[Optional[GPSData], Optional[IMUData]]:
|
"""
|
接收并解析一个数据包
|
|
Returns:
|
(gps_data, imu_data) 元组, 其中一个为None
|
"""
|
if not self.serial or not self.serial.is_open:
|
return None, None
|
|
ser = self.serial
|
if ser is None:
|
return None, None
|
|
# 查找帧头
|
if not self._find_frame_header():
|
return None, None
|
|
# 读取类型和长度
|
header_data = ser.read(3)
|
if len(header_data) != 3:
|
self.error_count += 1
|
return None, None
|
|
data_type = header_data[0]
|
data_len = struct.unpack('<H', header_data[1:3])[0]
|
|
# 读取数据负载
|
payload = ser.read(data_len)
|
if len(payload) != data_len:
|
self.error_count += 1
|
print(f"[WARN] 数据长度不匹配: 期望{data_len}, 实际{len(payload)}")
|
return None, None
|
|
# 读取Checksum和帧尾
|
footer_data = ser.read(4)
|
if len(footer_data) != 4:
|
self.error_count += 1
|
return None, None
|
|
checksum_received = struct.unpack('<H', footer_data[0:2])[0]
|
footer1 = footer_data[2]
|
footer2 = footer_data[3]
|
|
# 校验帧尾
|
if footer1 != self.FRAME_FOOTER1 or footer2 != self.FRAME_FOOTER2:
|
self.error_count += 1
|
print(f"[WARN] 帧尾错误: {footer1:02X} {footer2:02X}")
|
return None, None
|
|
# 计算并校验Checksum (16位累加和)
|
frame_for_checksum = bytes([data_type]) + struct.pack('<H', data_len) + payload
|
checksum_calculated = sum(frame_for_checksum) & 0xFFFF
|
|
if checksum_received != checksum_calculated:
|
self.error_count += 1
|
print(f"[WARN] Checksum校验失败: 接收{checksum_received:04X}, 计算{checksum_calculated:04X}")
|
return None, None
|
|
# 根据类型解析数据
|
if data_type == self.TYPE_GPS:
|
self.gps_count += 1
|
gps_data = self._parse_gps_data(payload)
|
return gps_data, None
|
elif data_type == self.TYPE_IMU:
|
self.imu_count += 1
|
imu_data = self._parse_imu_data(payload)
|
return None, imu_data
|
else:
|
self.error_count += 1
|
print(f"[WARN] 未知数据类型: {data_type:02X}")
|
return None, None
|
|
def print_stats(self):
|
"""打印统计信息"""
|
print(f"\n========== 接收统计 ==========")
|
print(f"GPS数据包: {self.gps_count}")
|
print(f"IMU数据包: {self.imu_count}")
|
print(f"错误计数: {self.error_count}")
|
print(f"==============================\n")
|
|
|
def main():
|
"""主函数示例"""
|
# 配置串口 (根据实际情况修改)
|
PORT = "COM17" # Windows: 'COM17', Linux: '/dev/ttyUSB0'
|
BAUDRATE = 921600
|
|
receiver = GPSIMUReceiver(PORT, BAUDRATE)
|
|
if not receiver.connect():
|
return
|
|
print("[INFO] 开始接收数据... (按 Ctrl+C 停止)")
|
|
try:
|
last_stats_time = time.time()
|
|
while True:
|
gps_data, imu_data = receiver.receive_packet()
|
|
if gps_data:
|
print(f"[GPS] 纬度: {gps_data.latitude:.8f}°, "
|
f"经度: {gps_data.longitude:.8f}°, "
|
f"航向: {gps_data.heading_angle:.2f}°, "
|
f"俯仰: {gps_data.pitch_angle:.2f}°, "
|
f"横滚: {gps_data.roll_angle:.2f}°, "
|
f"速度(E/N/U): {gps_data.east_velocity:.2f}/{gps_data.north_velocity:.2f}/{gps_data.up_velocity:.2f} m/s, "
|
f"卫星: {gps_data.satellite_count}, "
|
f"质量: {gps_data.position_quality}")
|
|
if imu_data:
|
print(f"[IMU] 加速度(X/Y/Z): {imu_data.accel_x:.3f}/{imu_data.accel_y:.3f}/{imu_data.accel_z:.3f} g, "
|
f"角速度(X/Y/Z): {imu_data.gyro_x:.2f}/{imu_data.gyro_y:.2f}/{imu_data.gyro_z:.2f} °/s, "
|
f"温度: {imu_data.temperature:.1f}℃")
|
|
# 每10秒打印一次统计信息
|
if time.time() - last_stats_time > 10.0:
|
receiver.print_stats()
|
last_stats_time = time.time()
|
|
except KeyboardInterrupt:
|
print("\n[INFO] 用户中断")
|
|
finally:
|
receiver.print_stats()
|
receiver.disconnect()
|
|
|
if __name__ == "__main__":
|
main()
|