yincheng.zhong
2025-12-10 8d0d8dec18cc9170f3fb82a4dba18160dd8e5233
python/hitl/protocols.py
@@ -1,18 +1,72 @@
"""
与 STM32H7 交互的协议封装:
- 构造 $GPRMI / $GPIMU 句子
- 构造 IM23A "fmin"/"fmim" 传感器帧
- 解析 PythonLink 控制帧
- 解析 ASCII 状态报文
"""
from __future__ import annotations
import math
import struct
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    """
    将 PWM 信号转换为线速度或角速度。
    基于 motion_config.h 中的模型参数:
    Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对)
             1000 PWM = 0.56 m/s
             1500 PWM = 0.0 m/s
             2000 PWM = -0.2 m/s (backward)
    Turn:    w = K_turn * (1500 - pwm)
             1000 PWM = 63 deg/s (Left)
             2000 PWM = -61.5 deg/s (Right)
    """
    if value < 1000: value = 1000
    if value > 2000: value = 2000
    # 使用参数
    # MC_CFG_FORWARD_K = 0.00196
    # MC_CFG_FORWARD_BIAS = -0.077
    # MC_CFG_STEERING_K_LEFT = 0.00206
    # MC_CFG_STEERING_K_RIGHT = 0.00201
    # 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可
    # 真实速度 = K * (1500 - pwm) + Bias
    delta = 1500 - value
    # 模拟器需要返回速度值,而不是设置速度。
    # 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放
    if max_speed < 10.0: # 线速度模式 (m/s)
        # 假设是 Forward PWM
        # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us
        # 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称)
        # 使用平均 K = 0.0015
        k_lin = 0.0015
        if abs(delta) < 40: return 0.0 # 死区
        return delta * k_lin
    else: # 角速度模式 (deg/s -> rad/s)
        # 假设是 Steering PWM
        # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us
        # 2000 -> -500 diff -> -61.5 deg/s
        # K_ang ~ 0.12 deg/s/us
        k_ang_deg = 0.12
        if abs(delta) < 10: return 0.0 # 死区
        deg_s = delta * k_ang_deg
        # 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        # 所以这里只需要返回 deg/s,调用者会转 rad/s
        return deg_s
GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
IM23A_NAV_LEN = 100
IM23A_IMU_LEN = 52
IM23A_NAV_HEADER = b"fmin"
IM23A_IMU_HEADER = b"fmim"
IM23A_TAIL = b"ed"
def _ensure_utc(dt: datetime) -> datetime:
@@ -21,31 +75,36 @@
    return dt.astimezone(timezone.utc)
def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
    dt = _ensure_utc(timestamp)
    delta = dt - GPS_EPOCH
    total_seconds = delta.total_seconds()
    week = int(total_seconds // 604800)
    tow = total_seconds - week * 604800
    return week, tow
def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
    dt = _ensure_utc(timestamp)
    base = dt.strftime("%H%M%S")
    frac = dt.microsecond / 1_000_000.0
    frac_str = f"{frac:.{decimals}f}".split(".")[1]
    return f"{base}.{frac_str}"
def nmea_checksum(sentence_without_star: str) -> str:
    cs = 0
    for ch in sentence_without_star[1:]:  # 跳过 $
    for ch in sentence_without_star[1:]:
        cs ^= ord(ch)
    return f"{cs:02X}"
def build_gprmi_sentence(
def _write_double(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<d", float(value)))
def _write_float(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<f", float(value)))
def _write_u32(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<I", int(value)))
def _write_u16(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<H", int(value)))
def _append_tail(buf: bytearray) -> None:
    checksum = sum(buf) & 0xFFFF
    buf.extend(struct.pack("<H", checksum))
    buf.extend(IM23A_TAIL)
def build_im23a_nav_frame(
    timestamp: datetime,
    lat_deg: float,
    lon_deg: float,
@@ -53,91 +112,67 @@
    east_vel: float,
    north_vel: float,
    up_vel: float,
    heading_deg: float,
    heading_rad: float,
    pitch_deg: float,
    roll_deg: float,
    *,
    lat_std: float = 0.008,
    lon_std: float = 0.008,
    alt_std: float = 0.02,
    vel_std: float = 0.02,
    heading_std: float = 0.1,
    pitch_std: float = 0.1,
    roll_std: float = 0.1,
    baseline_m: float = 1.0,
    sat_count: int = 20,
    ambiguity_count: int = 18,
    quality: int = 4,
    accel_bias: Tuple[float, float, float],
    gyro_bias: Tuple[float, float, float],
    temperature_c: float,
    status_flags: int,
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=2)
    week, tow = gps_week_and_tow(timestamp)
    fields = [
        utc_str,
        str(week),
        f"{tow:.3f}",
        f"{lat_deg:.9f}",
        f"{lon_deg:.9f}",
        f"{alt_m:.3f}",
        f"{lat_std:.3f}",
        f"{lon_std:.3f}",
        f"{alt_std:.3f}",
        f"{east_vel:.3f}",
        f"{north_vel:.3f}",
        f"{up_vel:.3f}",
        f"{vel_std:.3f}",
        f"{heading_deg:.3f}",
        f"{pitch_deg:.3f}",
        f"{roll_deg:.3f}",
        f"{heading_std:.3f}",
        f"{pitch_std:.3f}",
        f"{roll_std:.3f}",
        f"{baseline_m:.3f}",
        str(int(sat_count)),
        str(int(ambiguity_count)),
        str(int(quality)),
    ]
    body = "$GPRMI," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_NAV_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_double(buf, lat_deg)
    _write_double(buf, lon_deg)
    _write_double(buf, alt_m)
    _write_float(buf, north_vel)
    _write_float(buf, east_vel)
    _write_float(buf, -up_vel)  # IM23A 向下正,MCU 内部转换回向上
    _write_float(buf, roll_deg)
    _write_float(buf, pitch_deg)
    _write_float(buf, heading_rad)  # 航向角:弧度值,范围-3.14到3.14,0度正北方向
    _write_float(buf, 0.02)  # 定位精度,可根据需要调整
    _write_float(buf, accel_bias[0])
    _write_float(buf, accel_bias[1])
    _write_float(buf, accel_bias[2])
    _write_float(buf, gyro_bias[0])
    _write_float(buf, gyro_bias[1])
    _write_float(buf, gyro_bias[2])
    _write_float(buf, temperature_c)
    _write_u32(buf, status_flags)
    _append_tail(buf)
    return bytes(buf)
def build_gpimu_sentence(
def build_im23a_imu_frame(
    timestamp: datetime,
    accel_g: Tuple[float, float, float],
    gyro_deg_s: Tuple[float, float, float],
    temperature_c: float,
    reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=3)
    fields = [
        utc_str,
        f"{accel_g[0]:+.4f}",
        f"{accel_g[1]:+.4f}",
        f"{accel_g[2]:+.4f}",
        f"{gyro_deg_s[0]:+.4f}",
        f"{gyro_deg_s[1]:+.4f}",
        f"{gyro_deg_s[2]:+.4f}",
        f"{temperature_c:.2f}",
    ]
    body = "$GPIMU," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_IMU_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_float(buf, accel_g[0])
    _write_float(buf, accel_g[1])
    _write_float(buf, accel_g[2])
    _write_float(buf, gyro_deg_s[0])
    _write_float(buf, gyro_deg_s[1])
    _write_float(buf, gyro_deg_s[2])
    _write_float(buf, reserves[0])
    _write_float(buf, reserves[1])
    _write_float(buf, reserves[2])
    _append_tail(buf)
    return bytes(buf)
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    ratio = _clamp((value - center) / span, -1.0, 1.0)
    return ratio * max_speed
def _clamp(val: float, min_val: float, max_val: float) -> float:
    return max(min_val, min(max_val, val))
def _seconds_of_day(dt: datetime) -> float:
    utc = _ensure_utc(dt)
    return (
        utc.hour * 3600
        + utc.minute * 60
        + utc.second
        + utc.microsecond / 1_000_000.0
    )
def _decode_payload(payload: bytes) -> Tuple[float, float]:
@@ -147,7 +182,7 @@
    if len(payload) == 4:
        steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
        # 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
        forward = _pwm_to_velocity(throttle_pwm)
        forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0)
        turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        return forward, turn
    raise ValueError(f"不支持的控制负载长度: {len(payload)}")
@@ -159,6 +194,58 @@
    turn: float
@dataclass
class PythonAsciiMessage:
    tag: str
    fields: list[str]
@dataclass
class ControlStatus:
    forward_mps: float
    turn_rate: float
    freq_hz: float
    steering_pwm: int
    throttle_pwm: int
    stage: str
    timestamp_ms: float
    east: float
    north: float
    up: float
    heading_deg: float
    target_heading_deg: float
    target_east: float
    target_north: float
@dataclass
class PoseStatus:
    east: float
    north: float
    up: float
    heading_deg: float
    pitch_deg: float
    roll_deg: float
    target_east: float
    target_north: float
    timestamp_ms: float
@dataclass
class StateStatus:
    stage: str
    cross_track_error: float
    heading_error_deg: float
    timestamp_ms: float
@dataclass
class StackStatus:
    task_name: str
    stack_high_water: int
    heap_free_bytes: int
class PythonLinkDecoder:
    """
    解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
@@ -227,3 +314,132 @@
        return PythonLinkFrame(forward=forward, turn=turn)
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
    if not line:
        return None
    line = line.strip()
    if not line.startswith("$"):
        return None
    checksum_str = ""
    data_end = len(line)
    star_idx = line.find("*")
    if star_idx != -1:
        data_end = star_idx
        checksum_str = line[star_idx + 1 : star_idx + 3]
    payload = line[1:data_end]
    calc = 0
    for ch in payload:
        calc ^= ord(ch)
    if checksum_str:
        try:
            provided = int(checksum_str, 16)
        except ValueError:
            return None
        if provided != calc:
            return None
    parts = payload.split(",")
    if not parts:
        return None
    tag = parts[0]
    fields = parts[1:]
    return PythonAsciiMessage(tag=tag, fields=fields)
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
    if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
        return None
    try:
        forward = float(msg.fields[0])
        turn = float(msg.fields[1])
        freq = float(msg.fields[2])
        steering = int(float(msg.fields[3]))
        throttle = int(float(msg.fields[4]))
        stage = msg.fields[5]
        timestamp = float(msg.fields[6])
        east = float(msg.fields[7])
        north = float(msg.fields[8])
        up = float(msg.fields[9])
        heading = float(msg.fields[10])
        target_heading = float(msg.fields[11])
        target_e = float(msg.fields[12])
        target_n = float(msg.fields[13])
    except ValueError:
        return None
    return ControlStatus(
        forward_mps=forward,
        turn_rate=turn,
        freq_hz=freq,
        steering_pwm=steering,
        throttle_pwm=throttle,
        stage=stage,
        timestamp_ms=timestamp,
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        target_heading_deg=target_heading,
        target_east=target_e,
        target_north=target_n,
    )
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
    if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
        return None
    try:
        east = float(msg.fields[0])
        north = float(msg.fields[1])
        up = float(msg.fields[2])
        heading = float(msg.fields[3])
        pitch = float(msg.fields[4])
        roll = float(msg.fields[5])
        target_e = float(msg.fields[6])
        target_n = float(msg.fields[7])
        timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
    except ValueError:
        return None
    return PoseStatus(
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        pitch_deg=pitch,
        roll_deg=roll,
        target_east=target_e,
        target_north=target_n,
        timestamp_ms=timestamp,
    )
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
    if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
        return None
    stage = msg.fields[0]
    try:
        xte = float(msg.fields[1])
        heading_err = float(msg.fields[2])
        timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
    except ValueError:
        return None
    return StateStatus(
        stage=stage,
        cross_track_error=xte,
        heading_error_deg=heading_err,
        timestamp_ms=timestamp,
    )
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
    if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
        return None
    task = msg.fields[0]
    try:
        stack_hw = int(float(msg.fields[1]))
        heap_free = int(float(msg.fields[2]))
    except ValueError:
        return None
    return StackStatus(task_name=task,
                       stack_high_water=stack_hw,
                       heap_free_bytes=heap_free)