#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ GPS/IMU 数据接收解析器 用于接收来自 STM32H7 的 GPS 和 IMU 数据包 协议格式: AA 55 TYPE LEN DATA CHECKSUM 0D 0A """ import serial import struct import time from typing import Optional, Tuple from dataclasses import dataclass @dataclass class GPSData: """GPS 数据包结构""" latitude: float # 纬度(°) longitude: float # 经度(°) heading_angle: float # 航向角(°), 0~360 pitch_angle: float # 俯仰角(°) roll_angle: float # 横滚角(°) east_velocity: float # 东方向速度(m/s) north_velocity: float # 北方向速度(m/s) up_velocity: float # 天顶方向速度(m/s) altitude: float # 高程(m) utc_time: int # UTC时间, hhmmss格式 position_quality: int # 定位质量 satellite_count: int # 卫星数量 @dataclass class IMUData: """IMU 数据包结构""" accel_x: float # X轴加速度(g) accel_y: float # Y轴加速度(g) accel_z: float # Z轴加速度(g) gyro_x: float # X轴角速度(°/s) gyro_y: float # Y轴角速度(°/s) gyro_z: float # Z轴角速度(°/s) temperature: float # 传感器温度(℃) utc_time: int # UTC时间(毫秒) class GPSIMUReceiver: """GPS/IMU 数据接收解析器""" # 协议常量 FRAME_HEADER1 = 0xAA FRAME_HEADER2 = 0x55 FRAME_FOOTER1 = 0x0D FRAME_FOOTER2 = 0x0A TYPE_GPS = 0x01 TYPE_IMU = 0x02 # GPS数据包格式: 2个double + 7个float + 1个uint32 + 2个uint8 + 2字节对齐 = 52字节 # double(8) x2 + float(4) x7 + uint32(4) + uint8(1) x2 + padding(2) GPS_STRUCT_FMT = ' bool: """ 连接串口 Returns: 成功返回True, 失败返回False """ try: self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1.0 ) print(f"[INFO] 串口已连接: {self.port} @ {self.baudrate} bps") return True except Exception as e: print(f"[ERROR] 串口连接失败: {e}") return False def disconnect(self): """断开串口连接""" if self.serial and self.serial.is_open: self.serial.close() print("[INFO] 串口已断开") def _find_frame_header(self) -> bool: """ 查找帧头 AA 55 Returns: 找到返回True, 超时返回False """ ser = self.serial if ser is None: return False byte1 = ser.read(1) if len(byte1) == 0: return False while byte1[0] != self.FRAME_HEADER1: byte1 = ser.read(1) if len(byte1) == 0: return False byte2 = ser.read(1) if len(byte2) == 0 or byte2[0] != self.FRAME_HEADER2: return False return True def _parse_gps_data(self, payload: bytes) -> Optional[GPSData]: """ 解析GPS数据包 Args: payload: GPS数据负载 Returns: 解析成功返回GPSData对象, 失败返回None """ try: if len(payload) != self.GPS_STRUCT_SIZE: print(f"[WARN] GPS数据长度错误: {len(payload)} != {self.GPS_STRUCT_SIZE}") return None unpacked = struct.unpack(self.GPS_STRUCT_FMT, payload) return GPSData( latitude=unpacked[0], longitude=unpacked[1], heading_angle=unpacked[2], pitch_angle=unpacked[3], roll_angle=unpacked[4], east_velocity=unpacked[5], north_velocity=unpacked[6], up_velocity=unpacked[7], altitude=unpacked[8], utc_time=unpacked[9], position_quality=unpacked[10], satellite_count=unpacked[11] ) except Exception as e: print(f"[ERROR] GPS数据解析失败: {e}") return None def _parse_imu_data(self, payload: bytes) -> Optional[IMUData]: """ 解析IMU数据包 Args: payload: IMU数据负载 Returns: 解析成功返回IMUData对象, 失败返回None """ try: if len(payload) != self.IMU_STRUCT_SIZE: print(f"[WARN] IMU数据长度错误: {len(payload)} != {self.IMU_STRUCT_SIZE}") return None unpacked = struct.unpack(self.IMU_STRUCT_FMT, payload) return IMUData( accel_x=unpacked[0], accel_y=unpacked[1], accel_z=unpacked[2], gyro_x=unpacked[3], gyro_y=unpacked[4], gyro_z=unpacked[5], temperature=unpacked[6], utc_time=unpacked[7] ) except Exception as e: print(f"[ERROR] IMU数据解析失败: {e}") return None def receive_packet(self) -> Tuple[Optional[GPSData], Optional[IMUData]]: """ 接收并解析一个数据包 Returns: (gps_data, imu_data) 元组, 其中一个为None """ if not self.serial or not self.serial.is_open: return None, None ser = self.serial if ser is None: return None, None # 查找帧头 if not self._find_frame_header(): return None, None # 读取类型和长度 header_data = ser.read(3) if len(header_data) != 3: self.error_count += 1 return None, None data_type = header_data[0] data_len = struct.unpack(' 10.0: receiver.print_stats() last_stats_time = time.time() except KeyboardInterrupt: print("\n[INFO] 用户中断") finally: receiver.print_stats() receiver.disconnect() if __name__ == "__main__": main()