yincheng.zhong
3 天以前 30303d366d1a0d857357c90bed876686f2d1e603
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
COM28串口实际测试脚本 - 接收GPS和IMU数据
测试STM32 PythonLink模块的数据输出
"""
 
import serial
import struct
import time
from datetime import datetime
 
# 帧格式常量
FRAME_HEADER = b'\xAA\x55'
FRAME_FOOTER = b'\x0D\x0A'
 
# 数据类型
TYPE_GPS = 0x01
TYPE_IMU = 0x02
 
# 数据包大小
GPS_STRUCT_SIZE = 44  # 2*double + 5*float + uint32 + 2*uint8 + 2 padding
IMU_STRUCT_SIZE = 32  # 7*float + uint32
 
# 数据结构格式(小端)
GPS_STRUCT_FMT = '<dd5fIBB2x'  # latitude, longitude, heading, east_vel, north_vel, up_vel, altitude, timestamp, satellites, fix_quality, padding
IMU_STRUCT_FMT = '<7fI'        # accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, temp, timestamp
 
class SerialReceiver:
    def __init__(self, port='COM28', baudrate=921600):
        """初始化串口接收器"""
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.buffer = bytearray()
        
        # 统计信息
        self.gps_count = 0
        self.imu_count = 0
        self.error_count = 0
        self.start_time = None
        
    def connect(self):
        """连接串口"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=0.1
            )
            print(f"✓ 成功连接到 {self.port} @ {self.baudrate} bps")
            self.start_time = time.time()
            return True
        except Exception as e:
            print(f"✗ 串口连接失败: {e}")
            return False
    
    def disconnect(self):
        """断开串口"""
        if self.serial and self.serial.is_open:
            self.serial.close()
            print(f"\n串口已关闭")
    
    def calculate_checksum(self, data):
        """计算16位累加和校验"""
        checksum = sum(data) & 0xFFFF
        return checksum
    
    def parse_gps_data(self, payload, timestamp_ms):
        """解析GPS数据包"""
        try:
            data = struct.unpack(GPS_STRUCT_FMT, payload)
            latitude, longitude, heading, east_vel, north_vel, up_vel, altitude, timestamp, satellites, fix_quality = data
            
            print(f"\n[GPS #{self.gps_count}] {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
            print(f"  位置: 纬度 {latitude:.8f}°, 经度 {longitude:.8f}°")
            print(f"  航向角: {heading:.2f}°")
            print(f"  速度: 东 {east_vel:.3f} m/s, 北 {north_vel:.3f} m/s, 天 {up_vel:.3f} m/s")
            print(f"  海拔: {altitude:.2f} m")
            print(f"  卫星数: {satellites}, 定位质量: {fix_quality}")
            print(f"  时间戳: {timestamp} ms")
            
            return True
        except Exception as e:
            print(f"✗ GPS数据解析失败: {e}")
            return False
    
    def parse_imu_data(self, payload, timestamp_ms):
        """解析IMU数据包"""
        try:
            data = struct.unpack(IMU_STRUCT_FMT, payload)
            accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, temp, timestamp = data
            
            print(f"\n[IMU #{self.imu_count}] {datetime.now().strftime('%H:%M:%S.%f')[:-3]}")
            print(f"  加速度: X {accel_x:+.3f} g, Y {accel_y:+.3f} g, Z {accel_z:+.3f} g")
            print(f"  角速度: X {gyro_x:+.3f} °/s, Y {gyro_y:+.3f} °/s, Z {gyro_z:+.3f} °/s")
            print(f"  温度: {temp:.1f} °C")
            print(f"  时间戳: {timestamp} ms")
            
            return True
        except Exception as e:
            print(f"✗ IMU数据解析失败: {e}")
            return False
    
    def process_frame(self, frame_data):
        """处理一个完整的帧"""
        if len(frame_data) < 9:  # 至少: header(2) + type(1) + len(2) + checksum(2) + footer(2)
            return
        
        # 解析帧头
        if frame_data[:2] != FRAME_HEADER:
            return
        
        frame_type = frame_data[2]
        payload_len = struct.unpack('<H', frame_data[3:5])[0]
        
        # 检查帧长度
        expected_len = 2 + 1 + 2 + payload_len + 2 + 2  # header + type + len + payload + checksum + footer
        if len(frame_data) < expected_len:
            return
        
        # 提取数据
        payload = frame_data[5:5+payload_len]
        checksum_received = struct.unpack('<H', frame_data[5+payload_len:5+payload_len+2])[0]
        footer = frame_data[5+payload_len+2:5+payload_len+4]
        
        # 验证帧尾
        if footer != FRAME_FOOTER:
            print(f"✗ 帧尾错误: {footer.hex()}")
            self.error_count += 1
            return
        
        # 验证校验和
        checksum_data = frame_data[2:5+payload_len]  # type + len + payload
        checksum_calc = self.calculate_checksum(checksum_data)
        
        if checksum_received != checksum_calc:
            print(f"✗ 校验和错误: 接收 0x{checksum_received:04X}, 计算 0x{checksum_calc:04X}")
            self.error_count += 1
            return
        
        # 根据类型解析数据
        timestamp_ms = int(time.time() * 1000)
        
        if frame_type == TYPE_GPS:
            self.gps_count += 1
            if payload_len == GPS_STRUCT_SIZE:
                self.parse_gps_data(payload, timestamp_ms)
            else:
                print(f"✗ GPS数据长度错误: 期望 {GPS_STRUCT_SIZE}, 实际 {payload_len}")
                self.error_count += 1
                
        elif frame_type == TYPE_IMU:
            self.imu_count += 1
            if payload_len == IMU_STRUCT_SIZE:
                self.parse_imu_data(payload, timestamp_ms)
            else:
                print(f"✗ IMU数据长度错误: 期望 {IMU_STRUCT_SIZE}, 实际 {payload_len}")
                self.error_count += 1
        else:
            print(f"✗ 未知帧类型: 0x{frame_type:02X}")
            self.error_count += 1
    
    def find_and_process_frames(self):
        """在缓冲区中查找并处理完整的帧"""
        while True:
            # 查找帧头
            header_pos = self.buffer.find(FRAME_HEADER)
            if header_pos == -1:
                # 没有找到帧头,保留最后2个字节(可能是跨缓冲区的帧头)
                if len(self.buffer) > 2:
                    self.buffer = self.buffer[-2:]
                break
            
            # 丢弃帧头之前的数据
            if header_pos > 0:
                self.buffer = self.buffer[header_pos:]
            
            # 检查是否有足够的数据解析帧头
            if len(self.buffer) < 5:  # header(2) + type(1) + len(2)
                break
            
            # 解析负载长度
            payload_len = struct.unpack('<H', self.buffer[3:5])[0]
            
            # 检查长度是否合理
            if payload_len > 1024:  # 防止异常大的负载
                print(f"✗ 异常的负载长度: {payload_len}")
                self.buffer = self.buffer[2:]  # 跳过这个假帧头
                self.error_count += 1
                continue
            
            # 计算完整帧长度
            frame_len = 2 + 1 + 2 + payload_len + 2 + 2  # header + type + len + payload + checksum + footer
            
            # 检查是否收到完整的帧
            if len(self.buffer) < frame_len:
                break
            
            # 提取完整帧并处理
            frame_data = bytes(self.buffer[:frame_len])
            self.process_frame(frame_data)
            
            # 从缓冲区移除已处理的帧
            self.buffer = self.buffer[frame_len:]
    
    def print_statistics(self):
        """打印统计信息"""
        if self.start_time:
            elapsed = time.time() - self.start_time
            print(f"\n{'='*60}")
            print(f"运行时间: {elapsed:.1f} 秒")
            print(f"GPS数据包: {self.gps_count} 个 ({self.gps_count/elapsed:.1f} Hz)")
            print(f"IMU数据包: {self.imu_count} 个 ({self.imu_count/elapsed:.1f} Hz)")
            print(f"错误计数: {self.error_count} 个")
            print(f"{'='*60}")
    
    def run(self, duration=None):
        """运行接收循环"""
        print(f"\n开始接收数据... (按 Ctrl+C 停止)")
        print(f"{'='*60}")
        
        start_time = time.time()
        last_stats_time = start_time
        
        try:
            while True:
                # 读取串口数据
                if self.serial.in_waiting > 0:
                    data = self.serial.read(self.serial.in_waiting)
                    self.buffer.extend(data)
                    
                    # 处理缓冲区中的帧
                    self.find_and_process_frames()
                
                # 定期打印统计信息
                current_time = time.time()
                if current_time - last_stats_time >= 10.0:  # 每10秒
                    self.print_statistics()
                    last_stats_time = current_time
                
                # 检查是否达到指定运行时长
                if duration and (current_time - start_time) >= duration:
                    break
                
                time.sleep(0.001)  # 短暂休眠,避免CPU占用过高
                
        except KeyboardInterrupt:
            print("\n\n用户中断")
        finally:
            self.print_statistics()
            self.disconnect()
 
 
def main():
    """主函数"""
    print("="*60)
    print("STM32 GPS/IMU 数据接收测试")
    print("串口: COM28 @ 921600 bps")
    print("="*60)
    
    receiver = SerialReceiver(port='COM28', baudrate=921600)
    
    if receiver.connect():
        # 运行接收循环(无限期,直到按Ctrl+C)
        # 也可以指定运行时长,例如: receiver.run(duration=60) 运行60秒
        receiver.run()
    else:
        print("无法启动测试")
 
 
if __name__ == '__main__':
    main()