"""
|
与 STM32H7 交互的协议封装:
|
- 构造 $GPRMI / $GPIMU 句子
|
- 解析 PythonLink 控制帧
|
- 解析 ASCII 状态报文
|
"""
|
|
from __future__ import annotations
|
|
import math
|
import struct
|
from dataclasses import dataclass
|
from datetime import datetime, timezone
|
from typing import Callable, Optional, Tuple
|
|
GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
|
|
|
def _ensure_utc(dt: datetime) -> datetime:
|
if dt.tzinfo is None:
|
return dt.replace(tzinfo=timezone.utc)
|
return dt.astimezone(timezone.utc)
|
|
|
def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
|
dt = _ensure_utc(timestamp)
|
delta = dt - GPS_EPOCH
|
total_seconds = delta.total_seconds()
|
week = int(total_seconds // 604800)
|
tow = total_seconds - week * 604800
|
return week, tow
|
|
|
def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
|
dt = _ensure_utc(timestamp)
|
base = dt.strftime("%H%M%S")
|
frac = dt.microsecond / 1_000_000.0
|
frac_str = f"{frac:.{decimals}f}".split(".")[1]
|
return f"{base}.{frac_str}"
|
|
|
def nmea_checksum(sentence_without_star: str) -> str:
|
cs = 0
|
for ch in sentence_without_star[1:]: # 跳过 $
|
cs ^= ord(ch)
|
return f"{cs:02X}"
|
|
|
def build_gprmi_sentence(
|
timestamp: datetime,
|
lat_deg: float,
|
lon_deg: float,
|
alt_m: float,
|
east_vel: float,
|
north_vel: float,
|
up_vel: float,
|
heading_deg: float,
|
pitch_deg: float,
|
roll_deg: float,
|
*,
|
lat_std: float = 0.008,
|
lon_std: float = 0.008,
|
alt_std: float = 0.02,
|
vel_std: float = 0.02,
|
heading_std: float = 0.1,
|
pitch_std: float = 0.1,
|
roll_std: float = 0.1,
|
baseline_m: float = 1.0,
|
sat_count: int = 20,
|
ambiguity_count: int = 18,
|
quality: int = 4,
|
) -> bytes:
|
timestamp = _ensure_utc(timestamp)
|
utc_str = format_hhmmss(timestamp, decimals=2)
|
week, tow = gps_week_and_tow(timestamp)
|
|
fields = [
|
utc_str,
|
str(week),
|
f"{tow:.3f}",
|
f"{lat_deg:.9f}",
|
f"{lon_deg:.9f}",
|
f"{alt_m:.3f}",
|
f"{lat_std:.3f}",
|
f"{lon_std:.3f}",
|
f"{alt_std:.3f}",
|
f"{east_vel:.3f}",
|
f"{north_vel:.3f}",
|
f"{up_vel:.3f}",
|
f"{vel_std:.3f}",
|
f"{heading_deg:.3f}",
|
f"{pitch_deg:.3f}",
|
f"{roll_deg:.3f}",
|
f"{heading_std:.3f}",
|
f"{pitch_std:.3f}",
|
f"{roll_std:.3f}",
|
f"{baseline_m:.3f}",
|
str(int(sat_count)),
|
str(int(ambiguity_count)),
|
str(int(quality)),
|
]
|
|
body = "$GPFMI," + ",".join(fields)
|
checksum = nmea_checksum(body)
|
sentence = f"{body}*{checksum}\r\n"
|
return sentence.encode("ascii")
|
|
|
def build_gpimu_sentence(
|
timestamp: datetime,
|
accel_g: Tuple[float, float, float],
|
gyro_deg_s: Tuple[float, float, float],
|
temperature_c: float,
|
) -> bytes:
|
timestamp = _ensure_utc(timestamp)
|
utc_str = format_hhmmss(timestamp, decimals=3)
|
|
fields = [
|
utc_str,
|
f"{accel_g[0]:+.4f}",
|
f"{accel_g[1]:+.4f}",
|
f"{accel_g[2]:+.4f}",
|
f"{gyro_deg_s[0]:+.4f}",
|
f"{gyro_deg_s[1]:+.4f}",
|
f"{gyro_deg_s[2]:+.4f}",
|
f"{temperature_c:.2f}",
|
]
|
|
body = "$GPIMU," + ",".join(fields)
|
checksum = nmea_checksum(body)
|
sentence = f"{body}*{checksum}\r\n"
|
return sentence.encode("ascii")
|
|
|
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
|
ratio = _clamp((value - center) / span, -1.0, 1.0)
|
return ratio * max_speed
|
|
|
def _clamp(val: float, min_val: float, max_val: float) -> float:
|
return max(min_val, min(max_val, val))
|
|
|
def _decode_payload(payload: bytes) -> Tuple[float, float]:
|
if len(payload) == 8:
|
forward, turn = struct.unpack("<ff", payload)
|
return forward, turn
|
if len(payload) == 4:
|
steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
|
# 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
|
forward = _pwm_to_velocity(throttle_pwm)
|
turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
|
return forward, turn
|
raise ValueError(f"不支持的控制负载长度: {len(payload)}")
|
|
|
@dataclass
|
class PythonLinkFrame:
|
forward: float
|
turn: float
|
|
|
@dataclass
|
class PythonAsciiMessage:
|
tag: str
|
fields: list[str]
|
|
|
@dataclass
|
class ControlStatus:
|
forward_mps: float
|
turn_rate: float
|
freq_hz: float
|
steering_pwm: int
|
throttle_pwm: int
|
stage: str
|
timestamp_ms: float
|
|
|
@dataclass
|
class PoseStatus:
|
east: float
|
north: float
|
up: float
|
heading_deg: float
|
pitch_deg: float
|
roll_deg: float
|
target_east: float
|
target_north: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StateStatus:
|
stage: str
|
cross_track_error: float
|
heading_error_deg: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StackStatus:
|
task_name: str
|
stack_high_water: int
|
heap_free_bytes: int
|
|
|
class PythonLinkDecoder:
|
"""
|
解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
|
"""
|
|
FRAME_HEADER = b"\xAA\x55"
|
FRAME_FOOTER = b"\x0D\x0A"
|
TYPE_CONTROL = 0x10
|
|
def __init__(self, on_frame: Callable[[PythonLinkFrame], None]):
|
self._buffer = bytearray()
|
self._on_frame = on_frame
|
|
def feed(self, data: bytes):
|
if not data:
|
return
|
self._buffer.extend(data)
|
while True:
|
start = self._find_header()
|
if start < 0:
|
self._buffer.clear()
|
return
|
if start > 0:
|
del self._buffer[:start]
|
if len(self._buffer) < 9:
|
return
|
frame_len = self._expected_frame_length()
|
if frame_len is None or len(self._buffer) < frame_len:
|
return
|
frame = bytes(self._buffer[:frame_len])
|
del self._buffer[:frame_len]
|
try:
|
parsed = self._parse_frame(frame)
|
if parsed:
|
self._on_frame(parsed)
|
except Exception:
|
# 丢弃无效帧,继续搜索
|
continue
|
|
def _find_header(self) -> int:
|
data = bytes(self._buffer)
|
idx = data.find(self.FRAME_HEADER)
|
return idx
|
|
def _expected_frame_length(self) -> Optional[int]:
|
if len(self._buffer) < 5:
|
return None
|
payload_len = int.from_bytes(self._buffer[3:5], "little")
|
return 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer
|
|
def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]:
|
if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)):
|
return None
|
frame_type = frame[2]
|
if frame_type != self.TYPE_CONTROL:
|
return None
|
payload_len = int.from_bytes(frame[3:5], "little")
|
payload_start = 5
|
payload_end = payload_start + payload_len
|
payload = frame[payload_start:payload_end]
|
checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little")
|
checksum_calc = sum(frame[2:payload_end]) & 0xFFFF
|
if checksum_received != checksum_calc:
|
raise ValueError("校验和不匹配")
|
forward, turn = _decode_payload(payload)
|
return PythonLinkFrame(forward=forward, turn=turn)
|
|
|
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
|
if not line:
|
return None
|
line = line.strip()
|
if not line.startswith("$"):
|
return None
|
checksum_str = ""
|
data_end = len(line)
|
star_idx = line.find("*")
|
if star_idx != -1:
|
data_end = star_idx
|
checksum_str = line[star_idx + 1 : star_idx + 3]
|
payload = line[1:data_end]
|
calc = 0
|
for ch in payload:
|
calc ^= ord(ch)
|
if checksum_str:
|
try:
|
provided = int(checksum_str, 16)
|
except ValueError:
|
return None
|
if provided != calc:
|
return None
|
parts = payload.split(",")
|
if not parts:
|
return None
|
tag = parts[0]
|
fields = parts[1:]
|
return PythonAsciiMessage(tag=tag, fields=fields)
|
|
|
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
|
if msg.tag.upper() != "CTRL" or len(msg.fields) < 6:
|
return None
|
try:
|
forward = float(msg.fields[0])
|
turn = float(msg.fields[1])
|
freq = float(msg.fields[2])
|
steering = int(float(msg.fields[3]))
|
throttle = int(float(msg.fields[4]))
|
stage = msg.fields[5]
|
timestamp = float(msg.fields[6]) if len(msg.fields) > 6 else 0.0
|
except ValueError:
|
return None
|
return ControlStatus(
|
forward_mps=forward,
|
turn_rate=turn,
|
freq_hz=freq,
|
steering_pwm=steering,
|
throttle_pwm=throttle,
|
stage=stage,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
|
if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
|
return None
|
try:
|
east = float(msg.fields[0])
|
north = float(msg.fields[1])
|
up = float(msg.fields[2])
|
heading = float(msg.fields[3])
|
pitch = float(msg.fields[4])
|
roll = float(msg.fields[5])
|
target_e = float(msg.fields[6])
|
target_n = float(msg.fields[7])
|
timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
|
except ValueError:
|
return None
|
return PoseStatus(
|
east=east,
|
north=north,
|
up=up,
|
heading_deg=heading,
|
pitch_deg=pitch,
|
roll_deg=roll,
|
target_east=target_e,
|
target_north=target_n,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
|
if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
|
return None
|
stage = msg.fields[0]
|
try:
|
xte = float(msg.fields[1])
|
heading_err = float(msg.fields[2])
|
timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
|
except ValueError:
|
return None
|
return StateStatus(
|
stage=stage,
|
cross_track_error=xte,
|
heading_error_deg=heading_err,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
|
if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
|
return None
|
task = msg.fields[0]
|
try:
|
stack_hw = int(float(msg.fields[1]))
|
heap_free = int(float(msg.fields[2]))
|
except ValueError:
|
return None
|
return StackStatus(task_name=task,
|
stack_high_water=stack_hw,
|
heap_free_bytes=heap_free)
|