yincheng.zhong
2025-12-10 8d0d8dec18cc9170f3fb82a4dba18160dd8e5233
python/hitl/protocols.py
@@ -1,19 +1,72 @@
"""
与 STM32H7 交互的协议封装:
- 构造 $GPRMI / $GPIMU 句子
- 构造 IM23A "fmin"/"fmim" 传感器帧
- 解析 PythonLink 控制帧
- 解析 ASCII 状态报文
"""
from __future__ import annotations
import math
import struct
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    """
    将 PWM 信号转换为线速度或角速度。
    基于 motion_config.h 中的模型参数:
    Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对)
             1000 PWM = 0.56 m/s
             1500 PWM = 0.0 m/s
             2000 PWM = -0.2 m/s (backward)
    Turn:    w = K_turn * (1500 - pwm)
             1000 PWM = 63 deg/s (Left)
             2000 PWM = -61.5 deg/s (Right)
    """
    if value < 1000: value = 1000
    if value > 2000: value = 2000
    # 使用参数
    # MC_CFG_FORWARD_K = 0.00196
    # MC_CFG_FORWARD_BIAS = -0.077
    # MC_CFG_STEERING_K_LEFT = 0.00206
    # MC_CFG_STEERING_K_RIGHT = 0.00201
    # 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可
    # 真实速度 = K * (1500 - pwm) + Bias
    delta = 1500 - value
    # 模拟器需要返回速度值,而不是设置速度。
    # 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放
    if max_speed < 10.0: # 线速度模式 (m/s)
        # 假设是 Forward PWM
        # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us
        # 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称)
        # 使用平均 K = 0.0015
        k_lin = 0.0015
        if abs(delta) < 40: return 0.0 # 死区
        return delta * k_lin
    else: # 角速度模式 (deg/s -> rad/s)
        # 假设是 Steering PWM
        # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us
        # 2000 -> -500 diff -> -61.5 deg/s
        # K_ang ~ 0.12 deg/s/us
        k_ang_deg = 0.12
        if abs(delta) < 10: return 0.0 # 死区
        deg_s = delta * k_ang_deg
        # 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        # 所以这里只需要返回 deg/s,调用者会转 rad/s
        return deg_s
GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
IM23A_NAV_LEN = 100
IM23A_IMU_LEN = 52
IM23A_NAV_HEADER = b"fmin"
IM23A_IMU_HEADER = b"fmim"
IM23A_TAIL = b"ed"
def _ensure_utc(dt: datetime) -> datetime:
@@ -22,31 +75,36 @@
    return dt.astimezone(timezone.utc)
def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
    dt = _ensure_utc(timestamp)
    delta = dt - GPS_EPOCH
    total_seconds = delta.total_seconds()
    week = int(total_seconds // 604800)
    tow = total_seconds - week * 604800
    return week, tow
def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
    dt = _ensure_utc(timestamp)
    base = dt.strftime("%H%M%S")
    frac = dt.microsecond / 1_000_000.0
    frac_str = f"{frac:.{decimals}f}".split(".")[1]
    return f"{base}.{frac_str}"
def nmea_checksum(sentence_without_star: str) -> str:
    cs = 0
    for ch in sentence_without_star[1:]:  # 跳过 $
    for ch in sentence_without_star[1:]:
        cs ^= ord(ch)
    return f"{cs:02X}"
def build_gprmi_sentence(
def _write_double(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<d", float(value)))
def _write_float(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<f", float(value)))
def _write_u32(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<I", int(value)))
def _write_u16(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<H", int(value)))
def _append_tail(buf: bytearray) -> None:
    checksum = sum(buf) & 0xFFFF
    buf.extend(struct.pack("<H", checksum))
    buf.extend(IM23A_TAIL)
def build_im23a_nav_frame(
    timestamp: datetime,
    lat_deg: float,
    lon_deg: float,
@@ -54,91 +112,67 @@
    east_vel: float,
    north_vel: float,
    up_vel: float,
    heading_deg: float,
    heading_rad: float,
    pitch_deg: float,
    roll_deg: float,
    *,
    lat_std: float = 0.008,
    lon_std: float = 0.008,
    alt_std: float = 0.02,
    vel_std: float = 0.02,
    heading_std: float = 0.1,
    pitch_std: float = 0.1,
    roll_std: float = 0.1,
    baseline_m: float = 1.0,
    sat_count: int = 20,
    ambiguity_count: int = 18,
    quality: int = 4,
    accel_bias: Tuple[float, float, float],
    gyro_bias: Tuple[float, float, float],
    temperature_c: float,
    status_flags: int,
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=2)
    week, tow = gps_week_and_tow(timestamp)
    fields = [
        utc_str,
        str(week),
        f"{tow:.3f}",
        f"{lat_deg:.9f}",
        f"{lon_deg:.9f}",
        f"{alt_m:.3f}",
        f"{lat_std:.3f}",
        f"{lon_std:.3f}",
        f"{alt_std:.3f}",
        f"{east_vel:.3f}",
        f"{north_vel:.3f}",
        f"{up_vel:.3f}",
        f"{vel_std:.3f}",
        f"{heading_deg:.3f}",
        f"{pitch_deg:.3f}",
        f"{roll_deg:.3f}",
        f"{heading_std:.3f}",
        f"{pitch_std:.3f}",
        f"{roll_std:.3f}",
        f"{baseline_m:.3f}",
        str(int(sat_count)),
        str(int(ambiguity_count)),
        str(int(quality)),
    ]
    body = "$GPFMI," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_NAV_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_double(buf, lat_deg)
    _write_double(buf, lon_deg)
    _write_double(buf, alt_m)
    _write_float(buf, north_vel)
    _write_float(buf, east_vel)
    _write_float(buf, -up_vel)  # IM23A 向下正,MCU 内部转换回向上
    _write_float(buf, roll_deg)
    _write_float(buf, pitch_deg)
    _write_float(buf, heading_rad)  # 航向角:弧度值,范围-3.14到3.14,0度正北方向
    _write_float(buf, 0.02)  # 定位精度,可根据需要调整
    _write_float(buf, accel_bias[0])
    _write_float(buf, accel_bias[1])
    _write_float(buf, accel_bias[2])
    _write_float(buf, gyro_bias[0])
    _write_float(buf, gyro_bias[1])
    _write_float(buf, gyro_bias[2])
    _write_float(buf, temperature_c)
    _write_u32(buf, status_flags)
    _append_tail(buf)
    return bytes(buf)
def build_gpimu_sentence(
def build_im23a_imu_frame(
    timestamp: datetime,
    accel_g: Tuple[float, float, float],
    gyro_deg_s: Tuple[float, float, float],
    temperature_c: float,
    reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=3)
    fields = [
        utc_str,
        f"{accel_g[0]:+.4f}",
        f"{accel_g[1]:+.4f}",
        f"{accel_g[2]:+.4f}",
        f"{gyro_deg_s[0]:+.4f}",
        f"{gyro_deg_s[1]:+.4f}",
        f"{gyro_deg_s[2]:+.4f}",
        f"{temperature_c:.2f}",
    ]
    body = "$GPIMU," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_IMU_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_float(buf, accel_g[0])
    _write_float(buf, accel_g[1])
    _write_float(buf, accel_g[2])
    _write_float(buf, gyro_deg_s[0])
    _write_float(buf, gyro_deg_s[1])
    _write_float(buf, gyro_deg_s[2])
    _write_float(buf, reserves[0])
    _write_float(buf, reserves[1])
    _write_float(buf, reserves[2])
    _append_tail(buf)
    return bytes(buf)
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    ratio = _clamp((value - center) / span, -1.0, 1.0)
    return ratio * max_speed
def _clamp(val: float, min_val: float, max_val: float) -> float:
    return max(min_val, min(max_val, val))
def _seconds_of_day(dt: datetime) -> float:
    utc = _ensure_utc(dt)
    return (
        utc.hour * 3600
        + utc.minute * 60
        + utc.second
        + utc.microsecond / 1_000_000.0
    )
def _decode_payload(payload: bytes) -> Tuple[float, float]:
@@ -148,7 +182,7 @@
    if len(payload) == 4:
        steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
        # 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
        forward = _pwm_to_velocity(throttle_pwm)
        forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0)
        turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        return forward, turn
    raise ValueError(f"不支持的控制负载长度: {len(payload)}")