"""
|
与 STM32H7 交互的协议封装:
|
- 构造 IM23A "fmin"/"fmim" 传感器帧
|
- 解析 PythonLink 控制帧
|
- 解析 ASCII 状态报文
|
"""
|
|
from __future__ import annotations
|
|
import struct
|
import math
|
from dataclasses import dataclass
|
from datetime import datetime, timezone
|
from typing import Callable, Optional, Tuple
|
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
|
"""
|
将 PWM 信号转换为线速度或角速度。
|
基于 motion_config.h 中的模型参数:
|
Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对)
|
1000 PWM = 0.56 m/s
|
1500 PWM = 0.0 m/s
|
2000 PWM = -0.2 m/s (backward)
|
Turn: w = K_turn * (1500 - pwm)
|
1000 PWM = 63 deg/s (Left)
|
2000 PWM = -61.5 deg/s (Right)
|
"""
|
if value < 1000: value = 1000
|
if value > 2000: value = 2000
|
|
# 使用参数
|
# MC_CFG_FORWARD_K = 0.00196
|
# MC_CFG_FORWARD_BIAS = -0.077
|
# MC_CFG_STEERING_K_LEFT = 0.00206
|
# MC_CFG_STEERING_K_RIGHT = 0.00201
|
|
# 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可
|
# 真实速度 = K * (1500 - pwm) + Bias
|
|
delta = 1500 - value
|
# 模拟器需要返回速度值,而不是设置速度。
|
# 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放
|
|
if max_speed < 10.0: # 线速度模式 (m/s)
|
# 假设是 Forward PWM
|
# 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us
|
# 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称)
|
# 使用平均 K = 0.0015
|
k_lin = 0.0015
|
if abs(delta) < 40: return 0.0 # 死区
|
return delta * k_lin
|
else: # 角速度模式 (deg/s -> rad/s)
|
# 假设是 Steering PWM
|
# 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us
|
# 2000 -> -500 diff -> -61.5 deg/s
|
# K_ang ~ 0.12 deg/s/us
|
k_ang_deg = 0.12
|
if abs(delta) < 10: return 0.0 # 死区
|
deg_s = delta * k_ang_deg
|
# 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
|
# 所以这里只需要返回 deg/s,调用者会转 rad/s
|
return deg_s
|
|
|
|
IM23A_NAV_LEN = 100
|
IM23A_IMU_LEN = 52
|
IM23A_NAV_HEADER = b"fmin"
|
IM23A_IMU_HEADER = b"fmim"
|
IM23A_TAIL = b"ed"
|
|
|
def _ensure_utc(dt: datetime) -> datetime:
|
if dt.tzinfo is None:
|
return dt.replace(tzinfo=timezone.utc)
|
return dt.astimezone(timezone.utc)
|
|
|
def nmea_checksum(sentence_without_star: str) -> str:
|
cs = 0
|
for ch in sentence_without_star[1:]:
|
cs ^= ord(ch)
|
return f"{cs:02X}"
|
|
|
def _write_double(buf: bytearray, value: float) -> None:
|
buf.extend(struct.pack("<d", float(value)))
|
|
|
def _write_float(buf: bytearray, value: float) -> None:
|
buf.extend(struct.pack("<f", float(value)))
|
|
|
def _write_u32(buf: bytearray, value: int) -> None:
|
buf.extend(struct.pack("<I", int(value)))
|
|
|
def _write_u16(buf: bytearray, value: int) -> None:
|
buf.extend(struct.pack("<H", int(value)))
|
|
|
def _append_tail(buf: bytearray) -> None:
|
checksum = sum(buf) & 0xFFFF
|
buf.extend(struct.pack("<H", checksum))
|
buf.extend(IM23A_TAIL)
|
|
|
def build_im23a_nav_frame(
|
timestamp: datetime,
|
lat_deg: float,
|
lon_deg: float,
|
alt_m: float,
|
east_vel: float,
|
north_vel: float,
|
up_vel: float,
|
heading_rad: float,
|
pitch_deg: float,
|
roll_deg: float,
|
accel_bias: Tuple[float, float, float],
|
gyro_bias: Tuple[float, float, float],
|
temperature_c: float,
|
status_flags: int,
|
) -> bytes:
|
buf = bytearray(IM23A_NAV_HEADER)
|
_write_double(buf, _seconds_of_day(timestamp))
|
_write_double(buf, lat_deg)
|
_write_double(buf, lon_deg)
|
_write_double(buf, alt_m)
|
_write_float(buf, north_vel)
|
_write_float(buf, east_vel)
|
_write_float(buf, -up_vel) # IM23A 向下正,MCU 内部转换回向上
|
_write_float(buf, roll_deg)
|
_write_float(buf, pitch_deg)
|
_write_float(buf, heading_rad) # 航向角:弧度值,范围-3.14到3.14,0度正北方向
|
_write_float(buf, 0.02) # 定位精度,可根据需要调整
|
_write_float(buf, accel_bias[0])
|
_write_float(buf, accel_bias[1])
|
_write_float(buf, accel_bias[2])
|
_write_float(buf, gyro_bias[0])
|
_write_float(buf, gyro_bias[1])
|
_write_float(buf, gyro_bias[2])
|
_write_float(buf, temperature_c)
|
_write_u32(buf, status_flags)
|
_append_tail(buf)
|
return bytes(buf)
|
|
|
def build_im23a_imu_frame(
|
timestamp: datetime,
|
accel_g: Tuple[float, float, float],
|
gyro_deg_s: Tuple[float, float, float],
|
reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
|
) -> bytes:
|
buf = bytearray(IM23A_IMU_HEADER)
|
_write_double(buf, _seconds_of_day(timestamp))
|
_write_float(buf, accel_g[0])
|
_write_float(buf, accel_g[1])
|
_write_float(buf, accel_g[2])
|
_write_float(buf, gyro_deg_s[0])
|
_write_float(buf, gyro_deg_s[1])
|
_write_float(buf, gyro_deg_s[2])
|
_write_float(buf, reserves[0])
|
_write_float(buf, reserves[1])
|
_write_float(buf, reserves[2])
|
_append_tail(buf)
|
return bytes(buf)
|
|
|
def _seconds_of_day(dt: datetime) -> float:
|
utc = _ensure_utc(dt)
|
return (
|
utc.hour * 3600
|
+ utc.minute * 60
|
+ utc.second
|
+ utc.microsecond / 1_000_000.0
|
)
|
|
|
def _decode_payload(payload: bytes) -> Tuple[float, float]:
|
if len(payload) == 8:
|
forward, turn = struct.unpack("<ff", payload)
|
return forward, turn
|
if len(payload) == 4:
|
steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
|
# 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
|
forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0)
|
turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
|
return forward, turn
|
raise ValueError(f"不支持的控制负载长度: {len(payload)}")
|
|
|
@dataclass
|
class PythonLinkFrame:
|
forward: float
|
turn: float
|
|
|
@dataclass
|
class PythonAsciiMessage:
|
tag: str
|
fields: list[str]
|
|
|
@dataclass
|
class ControlStatus:
|
forward_mps: float
|
turn_rate: float
|
freq_hz: float
|
steering_pwm: int
|
throttle_pwm: int
|
stage: str
|
timestamp_ms: float
|
east: float
|
north: float
|
up: float
|
heading_deg: float
|
target_heading_deg: float
|
target_east: float
|
target_north: float
|
|
|
@dataclass
|
class PoseStatus:
|
east: float
|
north: float
|
up: float
|
heading_deg: float
|
pitch_deg: float
|
roll_deg: float
|
target_east: float
|
target_north: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StateStatus:
|
stage: str
|
cross_track_error: float
|
heading_error_deg: float
|
timestamp_ms: float
|
|
|
@dataclass
|
class StackStatus:
|
task_name: str
|
stack_high_water: int
|
heap_free_bytes: int
|
|
|
class PythonLinkDecoder:
|
"""
|
解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
|
"""
|
|
FRAME_HEADER = b"\xAA\x55"
|
FRAME_FOOTER = b"\x0D\x0A"
|
TYPE_CONTROL = 0x10
|
|
def __init__(self, on_frame: Callable[[PythonLinkFrame], None]):
|
self._buffer = bytearray()
|
self._on_frame = on_frame
|
|
def feed(self, data: bytes):
|
if not data:
|
return
|
self._buffer.extend(data)
|
while True:
|
start = self._find_header()
|
if start < 0:
|
self._buffer.clear()
|
return
|
if start > 0:
|
del self._buffer[:start]
|
if len(self._buffer) < 9:
|
return
|
frame_len = self._expected_frame_length()
|
if frame_len is None or len(self._buffer) < frame_len:
|
return
|
frame = bytes(self._buffer[:frame_len])
|
del self._buffer[:frame_len]
|
try:
|
parsed = self._parse_frame(frame)
|
if parsed:
|
self._on_frame(parsed)
|
except Exception:
|
# 丢弃无效帧,继续搜索
|
continue
|
|
def _find_header(self) -> int:
|
data = bytes(self._buffer)
|
idx = data.find(self.FRAME_HEADER)
|
return idx
|
|
def _expected_frame_length(self) -> Optional[int]:
|
if len(self._buffer) < 5:
|
return None
|
payload_len = int.from_bytes(self._buffer[3:5], "little")
|
return 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer
|
|
def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]:
|
if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)):
|
return None
|
frame_type = frame[2]
|
if frame_type != self.TYPE_CONTROL:
|
return None
|
payload_len = int.from_bytes(frame[3:5], "little")
|
payload_start = 5
|
payload_end = payload_start + payload_len
|
payload = frame[payload_start:payload_end]
|
checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little")
|
checksum_calc = sum(frame[2:payload_end]) & 0xFFFF
|
if checksum_received != checksum_calc:
|
raise ValueError("校验和不匹配")
|
forward, turn = _decode_payload(payload)
|
return PythonLinkFrame(forward=forward, turn=turn)
|
|
|
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
|
if not line:
|
return None
|
line = line.strip()
|
if not line.startswith("$"):
|
return None
|
checksum_str = ""
|
data_end = len(line)
|
star_idx = line.find("*")
|
if star_idx != -1:
|
data_end = star_idx
|
checksum_str = line[star_idx + 1 : star_idx + 3]
|
payload = line[1:data_end]
|
calc = 0
|
for ch in payload:
|
calc ^= ord(ch)
|
if checksum_str:
|
try:
|
provided = int(checksum_str, 16)
|
except ValueError:
|
return None
|
if provided != calc:
|
return None
|
parts = payload.split(",")
|
if not parts:
|
return None
|
tag = parts[0]
|
fields = parts[1:]
|
return PythonAsciiMessage(tag=tag, fields=fields)
|
|
|
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
|
if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
|
return None
|
try:
|
forward = float(msg.fields[0])
|
turn = float(msg.fields[1])
|
freq = float(msg.fields[2])
|
steering = int(float(msg.fields[3]))
|
throttle = int(float(msg.fields[4]))
|
stage = msg.fields[5]
|
timestamp = float(msg.fields[6])
|
east = float(msg.fields[7])
|
north = float(msg.fields[8])
|
up = float(msg.fields[9])
|
heading = float(msg.fields[10])
|
target_heading = float(msg.fields[11])
|
target_e = float(msg.fields[12])
|
target_n = float(msg.fields[13])
|
except ValueError:
|
return None
|
return ControlStatus(
|
forward_mps=forward,
|
turn_rate=turn,
|
freq_hz=freq,
|
steering_pwm=steering,
|
throttle_pwm=throttle,
|
stage=stage,
|
timestamp_ms=timestamp,
|
east=east,
|
north=north,
|
up=up,
|
heading_deg=heading,
|
target_heading_deg=target_heading,
|
target_east=target_e,
|
target_north=target_n,
|
)
|
|
|
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
|
if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
|
return None
|
try:
|
east = float(msg.fields[0])
|
north = float(msg.fields[1])
|
up = float(msg.fields[2])
|
heading = float(msg.fields[3])
|
pitch = float(msg.fields[4])
|
roll = float(msg.fields[5])
|
target_e = float(msg.fields[6])
|
target_n = float(msg.fields[7])
|
timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
|
except ValueError:
|
return None
|
return PoseStatus(
|
east=east,
|
north=north,
|
up=up,
|
heading_deg=heading,
|
pitch_deg=pitch,
|
roll_deg=roll,
|
target_east=target_e,
|
target_north=target_n,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
|
if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
|
return None
|
stage = msg.fields[0]
|
try:
|
xte = float(msg.fields[1])
|
heading_err = float(msg.fields[2])
|
timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
|
except ValueError:
|
return None
|
return StateStatus(
|
stage=stage,
|
cross_track_error=xte,
|
heading_error_deg=heading_err,
|
timestamp_ms=timestamp,
|
)
|
|
|
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
|
if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
|
return None
|
task = msg.fields[0]
|
try:
|
stack_hw = int(float(msg.fields[1]))
|
heap_free = int(float(msg.fields[2]))
|
except ValueError:
|
return None
|
return StackStatus(task_name=task,
|
stack_high_water=stack_hw,
|
heap_free_bytes=heap_free)
|