"""
|
与 STM32H7 交互的协议封装:
|
- 构造 IM23A "fmin"/"fmim" 传感器帧
|
- 解析 PythonLink 控制帧
|
- 解析 ASCII 状态报文
|
"""
|
|
from __future__ import annotations
|
|
import struct
|
import math
|
from dataclasses import dataclass
|
from datetime import datetime, timezone
|
from typing import Callable, Optional, Tuple
|
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
|
ratio = _clamp((value - center) / span, -1.0, 1.0)
|
return ratio * max_speed
|
|
|
|
IM23A_NAV_LEN = 100
|
IM23A_IMU_LEN = 52
|
IM23A_NAV_HEADER = b"fmin"
|
IM23A_IMU_HEADER = b"fmim"
|
IM23A_TAIL = b"ed"
|
|
|
def _ensure_utc(dt: datetime) -> datetime:
|
if dt.tzinfo is None:
|
return dt.replace(tzinfo=timezone.utc)
|
return dt.astimezone(timezone.utc)
|
|
|
def nmea_checksum(sentence_without_star: str) -> str:
|
cs = 0
|
for ch in sentence_without_star[1:]:
|
cs ^= ord(ch)
|
return f"{cs:02X}"
|
|
|
def _write_double(buf: bytearray, value: float) -> None:
|
buf.extend(struct.pack("<d", float(value)))
|
|
|
def _write_float(buf: bytearray, value: float) -> None:
|
buf.extend(struct.pack("<f", float(value)))
|
|
|
def _write_u32(buf: bytearray, value: int) -> None:
|
buf.extend(struct.pack("<I", int(value)))
|
|
|
def _write_u16(buf: bytearray, value: int) -> None:
|
buf.extend(struct.pack("<H", int(value)))
|
|
|
def _append_tail(buf: bytearray) -> None:
|
checksum = sum(buf) & 0xFFFF
|
buf.extend(struct.pack("<H", checksum))
|
buf.extend(IM23A_TAIL)
|
|
|
def build_im23a_nav_frame(
|
timestamp: datetime,
|
lat_deg: float,
|
lon_deg: float,
|
alt_m: float,
|
east_vel: float,
|
north_vel: float,
|
up_vel: float,
|
heading_deg: float,
|
pitch_deg: float,
|
roll_deg: float,
|
accel_bias: Tuple[float, float, float],
|
gyro_bias: Tuple[float, float, float],
|
temperature_c: float,
|
status_flags: int,
|
) -> bytes:
|
buf = bytearray(IM23A_NAV_HEADER)
|
_write_double(buf, _seconds_of_day(timestamp))
|
_write_double(buf, lat_deg)
|
_write_double(buf, lon_deg)
|
_write_double(buf, alt_m)
|
_write_float(buf, north_vel)
|
_write_float(buf, east_vel)
|
_write_float(buf, -up_vel) # IM23A 向下正,MCU 内部转换回向上
|
_write_float(buf, roll_deg)
|
_write_float(buf, pitch_deg)
|
_write_float(buf, heading_deg)
|
_write_float(buf, 0.02) # 定位精度,可根据需要调整
|
_write_float(buf, accel_bias[0])
|
_write_float(buf, accel_bias[1])
|
_write_float(buf, accel_bias[2])
|
_write_float(buf, gyro_bias[0])
|
_write_float(buf, gyro_bias[1])
|
_write_float(buf, gyro_bias[2])
|
_write_float(buf, temperature_c)
|
_write_u32(buf, status_flags)
|
_append_tail(buf)
|
return bytes(buf)
|
|
|
def build_im23a_imu_frame(
|
timestamp: datetime,
|
accel_g: Tuple[float, float, float],
|
gyro_deg_s: Tuple[float, float, float],
|
reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
|
) -> bytes:
|
buf = bytearray(IM23A_IMU_HEADER)
|
_write_double(buf, _seconds_of_day(timestamp))
|
_write_float(buf, accel_g[0])
|
_write_float(buf, accel_g[1])
|
_write_float(buf, accel_g[2])
|
_write_float(buf, gyro_deg_s[0])
|
_write_float(buf, gyro_deg_s[1])
|
_write_float(buf, gyro_deg_s[2])
|
_write_float(buf, reserves[0])
|
_write_float(buf, reserves[1])
|
_write_float(buf, reserves[2])
|
_append_tail(buf)
|
return bytes(buf)
|
|
|
def _seconds_of_day(dt: datetime) -> float:
|
utc = _ensure_utc(dt)
|
return (
|
utc.hour * 3600
|
+ utc.minute * 60
|
+ utc.second
|
+ utc.microsecond / 1_000_000.0
|
)
|
|
|
def _decode_payload(payload: bytes) -> Tuple[float, float]:
|
if len(payload) == 8:
|
forward, turn = struct.unpack("<ff", payload)
|
return forward, turn
|
if len(payload) == 4:
|
steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
|
# 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
|
forward = _pwm_to_velocity(throttle_pwm)
|
turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
|
return forward, turn
|
raise ValueError(f"不支持的控制负载长度: {len(payload)}")
|
|
|
@dataclass
|
class PythonLinkFrame:
|
forward: float
|
turn: float
|
|
|
@dataclass
|
class PythonAsciiMessage:
|
tag: str
|
fields: list[str]
|
|
|
@dataclass
|
class ControlStatus:
|
forward_mps: float
|
turn_rate: float
|
freq_hz: float
|
steering_pwm: int
|
throttle_pwm: int
|
stage: str
|
timestamp_ms: float
|
east: float
|
north: float
|
up: float
|
heading_deg: float
|
target_heading_deg: float
|
target_east: float
|
target_north: float
|
|
|
@dataclass
|
class PoseStatus:
|
east: float
|
north: float
|
up: float
|
heading_deg: float
|
pitch_deg: float
|
roll_deg: float
|
target_east: float
|
target_north: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StateStatus:
|
stage: str
|
cross_track_error: float
|
heading_error_deg: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StackStatus:
|
task_name: str
|
stack_high_water: int
|
heap_free_bytes: int
|
|
|
class PythonLinkDecoder:
|
"""
|
解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
|
"""
|
|
FRAME_HEADER = b"\xAA\x55"
|
FRAME_FOOTER = b"\x0D\x0A"
|
TYPE_CONTROL = 0x10
|
|
def __init__(self, on_frame: Callable[[PythonLinkFrame], None]):
|
self._buffer = bytearray()
|
self._on_frame = on_frame
|
|
def feed(self, data: bytes):
|
if not data:
|
return
|
self._buffer.extend(data)
|
while True:
|
start = self._find_header()
|
if start < 0:
|
self._buffer.clear()
|
return
|
if start > 0:
|
del self._buffer[:start]
|
if len(self._buffer) < 9:
|
return
|
frame_len = self._expected_frame_length()
|
if frame_len is None or len(self._buffer) < frame_len:
|
return
|
frame = bytes(self._buffer[:frame_len])
|
del self._buffer[:frame_len]
|
try:
|
parsed = self._parse_frame(frame)
|
if parsed:
|
self._on_frame(parsed)
|
except Exception:
|
# 丢弃无效帧,继续搜索
|
continue
|
|
def _find_header(self) -> int:
|
data = bytes(self._buffer)
|
idx = data.find(self.FRAME_HEADER)
|
return idx
|
|
def _expected_frame_length(self) -> Optional[int]:
|
if len(self._buffer) < 5:
|
return None
|
payload_len = int.from_bytes(self._buffer[3:5], "little")
|
return 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer
|
|
def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]:
|
if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)):
|
return None
|
frame_type = frame[2]
|
if frame_type != self.TYPE_CONTROL:
|
return None
|
payload_len = int.from_bytes(frame[3:5], "little")
|
payload_start = 5
|
payload_end = payload_start + payload_len
|
payload = frame[payload_start:payload_end]
|
checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little")
|
checksum_calc = sum(frame[2:payload_end]) & 0xFFFF
|
if checksum_received != checksum_calc:
|
raise ValueError("校验和不匹配")
|
forward, turn = _decode_payload(payload)
|
return PythonLinkFrame(forward=forward, turn=turn)
|
|
|
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
|
if not line:
|
return None
|
line = line.strip()
|
if not line.startswith("$"):
|
return None
|
checksum_str = ""
|
data_end = len(line)
|
star_idx = line.find("*")
|
if star_idx != -1:
|
data_end = star_idx
|
checksum_str = line[star_idx + 1 : star_idx + 3]
|
payload = line[1:data_end]
|
calc = 0
|
for ch in payload:
|
calc ^= ord(ch)
|
if checksum_str:
|
try:
|
provided = int(checksum_str, 16)
|
except ValueError:
|
return None
|
if provided != calc:
|
return None
|
parts = payload.split(",")
|
if not parts:
|
return None
|
tag = parts[0]
|
fields = parts[1:]
|
return PythonAsciiMessage(tag=tag, fields=fields)
|
|
|
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
|
if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
|
return None
|
try:
|
forward = float(msg.fields[0])
|
turn = float(msg.fields[1])
|
freq = float(msg.fields[2])
|
steering = int(float(msg.fields[3]))
|
throttle = int(float(msg.fields[4]))
|
stage = msg.fields[5]
|
timestamp = float(msg.fields[6])
|
east = float(msg.fields[7])
|
north = float(msg.fields[8])
|
up = float(msg.fields[9])
|
heading = float(msg.fields[10])
|
target_heading = float(msg.fields[11])
|
target_e = float(msg.fields[12])
|
target_n = float(msg.fields[13])
|
except ValueError:
|
return None
|
return ControlStatus(
|
forward_mps=forward,
|
turn_rate=turn,
|
freq_hz=freq,
|
steering_pwm=steering,
|
throttle_pwm=throttle,
|
stage=stage,
|
timestamp_ms=timestamp,
|
east=east,
|
north=north,
|
up=up,
|
heading_deg=heading,
|
target_heading_deg=target_heading,
|
target_east=target_e,
|
target_north=target_n,
|
)
|
|
|
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
|
if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
|
return None
|
try:
|
east = float(msg.fields[0])
|
north = float(msg.fields[1])
|
up = float(msg.fields[2])
|
heading = float(msg.fields[3])
|
pitch = float(msg.fields[4])
|
roll = float(msg.fields[5])
|
target_e = float(msg.fields[6])
|
target_n = float(msg.fields[7])
|
timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
|
except ValueError:
|
return None
|
return PoseStatus(
|
east=east,
|
north=north,
|
up=up,
|
heading_deg=heading,
|
pitch_deg=pitch,
|
roll_deg=roll,
|
target_east=target_e,
|
target_north=target_n,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
|
if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
|
return None
|
stage = msg.fields[0]
|
try:
|
xte = float(msg.fields[1])
|
heading_err = float(msg.fields[2])
|
timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
|
except ValueError:
|
return None
|
return StateStatus(
|
stage=stage,
|
cross_track_error=xte,
|
heading_error_deg=heading_err,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
|
if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
|
return None
|
task = msg.fields[0]
|
try:
|
stack_hw = int(float(msg.fields[1]))
|
heap_free = int(float(msg.fields[2]))
|
except ValueError:
|
return None
|
return StackStatus(task_name=task,
|
stack_high_water=stack_hw,
|
heap_free_bytes=heap_free)
|