#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
GPS/IMU PythonLink 协议验证脚本
|
|
功能:
|
1. 串口实时读取帧 (live 模式) 验证 GPS 帧 checksum 与结构解包
|
2. 构造本地示例帧 (sample 模式) 做自测
|
|
协议: Frame = AA 55 | Type(1) | Len(2 LE) | Payload | Checksum(2 LE) | 0D 0A
|
Checksum = (sum of Type + Len(2B) + Payload bytes) & 0xFFFF (小端存储)
|
GPS Type = 0x01, Payload = 44 字节:
|
double lat, double lon,
|
float heading,
|
float vel_east, float vel_north, float vel_up,
|
float altitude,
|
uint32 utc_time,
|
uint8 pos_quality, uint8 sat_count, 2 bytes reserved
|
|
使用示例:
|
python tools/verify_gps.py --port COM28 --baud 921600 --mode live
|
python tools/verify_gps.py --mode sample
|
|
按 Ctrl+C 退出实时模式。
|
"""
|
import argparse
|
import struct
|
import sys
|
import time
|
from typing import Optional, Tuple
|
|
try:
|
import serial # pyserial
|
except ImportError:
|
serial = None
|
|
HEADER1 = 0xAA
|
HEADER2 = 0x55
|
FOOTER1 = 0x0D
|
FOOTER2 = 0x0A
|
TYPE_GPS = 0x01
|
# TYPE_IMU = 0x02 # 备用
|
GPS_PAYLOAD_LEN = 44
|
FRAME_HEADER_LEN = 4 # 0xAA 0x55 Type Len(2)
|
FRAME_FOOTER_LEN = 4 # Checksum(2) 0x0D 0x0A
|
|
GPS_STRUCT_FMT = '<ddfffffI2B2x'
|
# 验证结构尺寸
|
assert struct.calcsize(GPS_STRUCT_FMT) == GPS_PAYLOAD_LEN, 'GPS struct size mismatch'
|
|
class GPSFrame:
|
def __init__(self, lat, lon, heading, vel_e, vel_n, vel_u, altitude, utc_time, pos_q, sat_cnt):
|
self.lat = lat
|
self.lon = lon
|
self.heading = heading
|
self.vel_e = vel_e
|
self.vel_n = vel_n
|
self.vel_u = vel_u
|
self.altitude = altitude
|
self.utc_time = utc_time
|
self.pos_q = pos_q
|
self.sat_cnt = sat_cnt
|
|
def __str__(self):
|
return (f'GPSFrame(lat={self.lat:.8f}, lon={self.lon:.8f}, heading={self.heading:.2f}, '
|
f'vel_e={self.vel_e:.2f}, vel_n={self.vel_n:.2f}, vel_u={self.vel_u:.2f}, '
|
f'alt={self.altitude:.2f}, utc={self.utc_time}, posQ={self.pos_q}, sats={self.sat_cnt})')
|
|
def calc_checksum(type_byte: int, length_le: bytes, payload: bytes) -> int:
|
s = type_byte + sum(length_le) + sum(payload)
|
return s & 0xFFFF
|
|
def parse_gps_payload(payload: bytes) -> GPSFrame:
|
(lat, lon, heading, vel_e, vel_n, vel_u, altitude, utc_time, pos_q, sat_cnt) = struct.unpack(GPS_STRUCT_FMT, payload)
|
return GPSFrame(lat, lon, heading, vel_e, vel_n, vel_u, altitude, utc_time, pos_q, sat_cnt)
|
|
def parse_frame(buf: bytes) -> Tuple[Optional[GPSFrame], str]:
|
# 最小长度检查
|
if len(buf) < FRAME_HEADER_LEN + FRAME_FOOTER_LEN:
|
return None, 'buffer too short'
|
if buf[0] != HEADER1 or buf[1] != HEADER2:
|
return None, 'header mismatch'
|
type_byte = buf[2]
|
length = buf[3] | (buf[4] << 8)
|
total_needed = FRAME_HEADER_LEN + length + FRAME_FOOTER_LEN
|
if len(buf) < total_needed:
|
return None, 'incomplete frame'
|
payload = buf[5:5+length]
|
checksum_recv = buf[5+length] | (buf[6+length] << 8)
|
footer1 = buf[7+length]
|
footer2 = buf[8+length]
|
checksum_calc = calc_checksum(type_byte, buf[3:5], payload)
|
if checksum_recv != checksum_calc:
|
return None, f'checksum error recv=0x{checksum_recv:04X} calc=0x{checksum_calc:04X}'
|
if footer1 != FOOTER1 or footer2 != FOOTER2:
|
return None, f'footer error got=[0x{footer1:02X},0x{footer2:02X}]'
|
if type_byte != TYPE_GPS:
|
return None, f'unsupported type 0x{type_byte:02X}'
|
if length != GPS_PAYLOAD_LEN:
|
return None, f'payload length mismatch {length} != {GPS_PAYLOAD_LEN}'
|
gps = parse_gps_payload(payload)
|
return gps, 'ok'
|
|
def run_live(port: str, baud: int, timeout: float):
|
if serial is None:
|
print('pyserial 未安装,先执行: pip install pyserial', file=sys.stderr)
|
sys.exit(1)
|
ser = serial.Serial(port=port, baudrate=baud, timeout=0.05)
|
print(f'[Live] 打开串口 {port} @ {baud}')
|
buffer = bytearray()
|
stats_ok = 0
|
stats_err = 0
|
last_print = time.time()
|
start = time.time()
|
try:
|
while True:
|
chunk = ser.read(4096)
|
if chunk:
|
buffer.extend(chunk)
|
# 搜索帧头
|
while True:
|
idx = buffer.find(bytes([HEADER1, HEADER2]))
|
if idx < 0:
|
# 没有头,丢弃旧数据防止无限膨胀
|
if len(buffer) > 8192:
|
del buffer[:-2]
|
break
|
# 丢弃头前数据
|
if idx > 0:
|
del buffer[:idx]
|
if len(buffer) < FRAME_HEADER_LEN:
|
break
|
# 长度字段
|
if len(buffer) >= 5:
|
length = buffer[3] | (buffer[4] << 8)
|
total_len = FRAME_HEADER_LEN + length + FRAME_FOOTER_LEN
|
if len(buffer) < total_len:
|
break # 等更多字节
|
frame_bytes = bytes(buffer[:total_len])
|
del buffer[:total_len]
|
gps, status = parse_frame(frame_bytes)
|
if gps is not None:
|
stats_ok += 1
|
print(f'[OK] {gps}')
|
else:
|
stats_err += 1
|
print(f'[ERR] {status}')
|
else:
|
break
|
if timeout > 0 and (time.time() - start) >= timeout:
|
print('达到超时时间, 退出')
|
break
|
if time.time() - last_print > 5:
|
last_print = time.time()
|
print(f'[Stats] OK={stats_ok} ERR={stats_err} RunTime={time.time()-start:.1f}s')
|
except KeyboardInterrupt:
|
print('\n用户中断')
|
finally:
|
ser.close()
|
print(f'结束: OK={stats_ok} ERR={stats_err}')
|
|
def run_sample():
|
# 构造一个示例 GPS payload
|
lat = 31.12345678
|
lon = 121.98765432
|
heading = 45.25
|
vel_e = 0.12
|
vel_n = -0.05
|
vel_u = 0.01
|
altitude = 12.34
|
utc_time = 123456789
|
pos_q = 4
|
sat_cnt = 18
|
payload = struct.pack(GPS_STRUCT_FMT, lat, lon, heading, vel_e, vel_n, vel_u, altitude, utc_time, pos_q, sat_cnt)
|
length_le = struct.pack('<H', len(payload))
|
checksum = calc_checksum(TYPE_GPS, length_le, payload)
|
frame = bytearray()
|
frame.append(HEADER1)
|
frame.append(HEADER2)
|
frame.append(TYPE_GPS)
|
frame.extend(length_le)
|
frame.extend(payload)
|
frame.extend(struct.pack('<H', checksum))
|
frame.append(FOOTER1)
|
frame.append(FOOTER2)
|
gps, status = parse_frame(bytes(frame))
|
print(f'Sample frame parse status: {status}')
|
if gps:
|
print(gps)
|
print('Raw frame hex:', frame.hex())
|
|
def main():
|
ap = argparse.ArgumentParser(description='PythonLink GPS 验证工具')
|
ap.add_argument('--port', '-p', default='COM28', help='串口号 (live 模式需要)')
|
ap.add_argument('--baud', '-b', type=int, default=921600, help='波特率')
|
ap.add_argument('--mode', '-m', choices=['live', 'sample'], default='live', help='运行模式: live=实时串口, sample=本地示例')
|
ap.add_argument('--timeout', '-t', type=float, default=0, help='live 模式运行秒数(0=无限直到Ctrl+C)')
|
args = ap.parse_args()
|
if args.mode == 'sample':
|
run_sample()
|
else:
|
run_live(args.port, args.baud, args.timeout)
|
|
if __name__ == '__main__':
|
main()
|