yincheng.zhong
2025-12-04 f88f3da8f132cd1dd321dfc584a1fe68b6eb2138
python/hitl/protocols.py
@@ -1,19 +1,28 @@
"""
与 STM32H7 交互的协议封装:
- 构造 $GPRMI / $GPIMU 句子
- 构造 IM23A "fmin"/"fmim" 传感器帧
- 解析 PythonLink 控制帧
- 解析 ASCII 状态报文
"""
from __future__ import annotations
import math
import struct
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    ratio = _clamp((value - center) / span, -1.0, 1.0)
    return ratio * max_speed
GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
IM23A_NAV_LEN = 100
IM23A_IMU_LEN = 52
IM23A_NAV_HEADER = b"fmin"
IM23A_IMU_HEADER = b"fmim"
IM23A_TAIL = b"ed"
def _ensure_utc(dt: datetime) -> datetime:
@@ -22,31 +31,36 @@
    return dt.astimezone(timezone.utc)
def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
    dt = _ensure_utc(timestamp)
    delta = dt - GPS_EPOCH
    total_seconds = delta.total_seconds()
    week = int(total_seconds // 604800)
    tow = total_seconds - week * 604800
    return week, tow
def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
    dt = _ensure_utc(timestamp)
    base = dt.strftime("%H%M%S")
    frac = dt.microsecond / 1_000_000.0
    frac_str = f"{frac:.{decimals}f}".split(".")[1]
    return f"{base}.{frac_str}"
def nmea_checksum(sentence_without_star: str) -> str:
    cs = 0
    for ch in sentence_without_star[1:]:  # 跳过 $
    for ch in sentence_without_star[1:]:
        cs ^= ord(ch)
    return f"{cs:02X}"
def build_gprmi_sentence(
def _write_double(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<d", float(value)))
def _write_float(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<f", float(value)))
def _write_u32(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<I", int(value)))
def _write_u16(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<H", int(value)))
def _append_tail(buf: bytearray) -> None:
    checksum = sum(buf) & 0xFFFF
    buf.extend(struct.pack("<H", checksum))
    buf.extend(IM23A_TAIL)
def build_im23a_nav_frame(
    timestamp: datetime,
    lat_deg: float,
    lon_deg: float,
@@ -57,88 +71,64 @@
    heading_deg: float,
    pitch_deg: float,
    roll_deg: float,
    *,
    lat_std: float = 0.008,
    lon_std: float = 0.008,
    alt_std: float = 0.02,
    vel_std: float = 0.02,
    heading_std: float = 0.1,
    pitch_std: float = 0.1,
    roll_std: float = 0.1,
    baseline_m: float = 1.0,
    sat_count: int = 20,
    ambiguity_count: int = 18,
    quality: int = 4,
    accel_bias: Tuple[float, float, float],
    gyro_bias: Tuple[float, float, float],
    temperature_c: float,
    status_flags: int,
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=2)
    week, tow = gps_week_and_tow(timestamp)
    fields = [
        utc_str,
        str(week),
        f"{tow:.3f}",
        f"{lat_deg:.9f}",
        f"{lon_deg:.9f}",
        f"{alt_m:.3f}",
        f"{lat_std:.3f}",
        f"{lon_std:.3f}",
        f"{alt_std:.3f}",
        f"{east_vel:.3f}",
        f"{north_vel:.3f}",
        f"{up_vel:.3f}",
        f"{vel_std:.3f}",
        f"{heading_deg:.3f}",
        f"{pitch_deg:.3f}",
        f"{roll_deg:.3f}",
        f"{heading_std:.3f}",
        f"{pitch_std:.3f}",
        f"{roll_std:.3f}",
        f"{baseline_m:.3f}",
        str(int(sat_count)),
        str(int(ambiguity_count)),
        str(int(quality)),
    ]
    body = "$GPFMI," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_NAV_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_double(buf, lat_deg)
    _write_double(buf, lon_deg)
    _write_double(buf, alt_m)
    _write_float(buf, north_vel)
    _write_float(buf, east_vel)
    _write_float(buf, -up_vel)  # IM23A 向下正,MCU 内部转换回向上
    _write_float(buf, roll_deg)
    _write_float(buf, pitch_deg)
    _write_float(buf, heading_deg)
    _write_float(buf, 0.02)  # 定位精度,可根据需要调整
    _write_float(buf, accel_bias[0])
    _write_float(buf, accel_bias[1])
    _write_float(buf, accel_bias[2])
    _write_float(buf, gyro_bias[0])
    _write_float(buf, gyro_bias[1])
    _write_float(buf, gyro_bias[2])
    _write_float(buf, temperature_c)
    _write_u32(buf, status_flags)
    _append_tail(buf)
    return bytes(buf)
def build_gpimu_sentence(
def build_im23a_imu_frame(
    timestamp: datetime,
    accel_g: Tuple[float, float, float],
    gyro_deg_s: Tuple[float, float, float],
    temperature_c: float,
    reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=3)
    fields = [
        utc_str,
        f"{accel_g[0]:+.4f}",
        f"{accel_g[1]:+.4f}",
        f"{accel_g[2]:+.4f}",
        f"{gyro_deg_s[0]:+.4f}",
        f"{gyro_deg_s[1]:+.4f}",
        f"{gyro_deg_s[2]:+.4f}",
        f"{temperature_c:.2f}",
    ]
    body = "$GPIMU," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
    buf = bytearray(IM23A_IMU_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_float(buf, accel_g[0])
    _write_float(buf, accel_g[1])
    _write_float(buf, accel_g[2])
    _write_float(buf, gyro_deg_s[0])
    _write_float(buf, gyro_deg_s[1])
    _write_float(buf, gyro_deg_s[2])
    _write_float(buf, reserves[0])
    _write_float(buf, reserves[1])
    _write_float(buf, reserves[2])
    _append_tail(buf)
    return bytes(buf)
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    ratio = _clamp((value - center) / span, -1.0, 1.0)
    return ratio * max_speed
def _clamp(val: float, min_val: float, max_val: float) -> float:
    return max(min_val, min(max_val, val))
def _seconds_of_day(dt: datetime) -> float:
    utc = _ensure_utc(dt)
    return (
        utc.hour * 3600
        + utc.minute * 60
        + utc.second
        + utc.microsecond / 1_000_000.0
    )
def _decode_payload(payload: bytes) -> Tuple[float, float]: