#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ COM28串口实际测试脚本 - 接收GPS和IMU数据 测试STM32 PythonLink模块的数据输出 """ import serial import struct import time from datetime import datetime # 帧格式常量 FRAME_HEADER = b'\xAA\x55' FRAME_FOOTER = b'\x0D\x0A' # 数据类型 TYPE_GPS = 0x01 TYPE_IMU = 0x02 # 数据包大小 GPS_STRUCT_SIZE = 44 # 2*double + 5*float + uint32 + 2*uint8 + 2 padding IMU_STRUCT_SIZE = 32 # 7*float + uint32 # 数据结构格式(小端) GPS_STRUCT_FMT = ' 2: self.buffer = self.buffer[-2:] break # 丢弃帧头之前的数据 if header_pos > 0: self.buffer = self.buffer[header_pos:] # 检查是否有足够的数据解析帧头 if len(self.buffer) < 5: # header(2) + type(1) + len(2) break # 解析负载长度 payload_len = struct.unpack(' 1024: # 防止异常大的负载 print(f"✗ 异常的负载长度: {payload_len}") self.buffer = self.buffer[2:] # 跳过这个假帧头 self.error_count += 1 continue # 计算完整帧长度 frame_len = 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer # 检查是否收到完整的帧 if len(self.buffer) < frame_len: break # 提取完整帧并处理 frame_data = bytes(self.buffer[:frame_len]) self.process_frame(frame_data) # 从缓冲区移除已处理的帧 self.buffer = self.buffer[frame_len:] def print_statistics(self): """打印统计信息""" if self.start_time: elapsed = time.time() - self.start_time print(f"\n{'='*60}") print(f"运行时间: {elapsed:.1f} 秒") print(f"GPS数据包: {self.gps_count} 个 ({self.gps_count/elapsed:.1f} Hz)") print(f"IMU数据包: {self.imu_count} 个 ({self.imu_count/elapsed:.1f} Hz)") print(f"错误计数: {self.error_count} 个") print(f"{'='*60}") def run(self, duration=None): """运行接收循环""" print(f"\n开始接收数据... (按 Ctrl+C 停止)") print(f"{'='*60}") start_time = time.time() last_stats_time = start_time try: while True: # 读取串口数据 if self.serial.in_waiting > 0: data = self.serial.read(self.serial.in_waiting) self.buffer.extend(data) # 处理缓冲区中的帧 self.find_and_process_frames() # 定期打印统计信息 current_time = time.time() if current_time - last_stats_time >= 10.0: # 每10秒 self.print_statistics() last_stats_time = current_time # 检查是否达到指定运行时长 if duration and (current_time - start_time) >= duration: break time.sleep(0.001) # 短暂休眠,避免CPU占用过高 except KeyboardInterrupt: print("\n\n用户中断") finally: self.print_statistics() self.disconnect() def main(): """主函数""" print("="*60) print("STM32 GPS/IMU 数据接收测试") print("串口: COM28 @ 921600 bps") print("="*60) receiver = SerialReceiver(port='COM28', baudrate=921600) if receiver.connect(): # 运行接收循环(无限期,直到按Ctrl+C) # 也可以指定运行时长,例如: receiver.run(duration=60) 运行60秒 receiver.run() else: print("无法启动测试") if __name__ == '__main__': main()