From f88f3da8f132cd1dd321dfc584a1fe68b6eb2138 Mon Sep 17 00:00:00 2001
From: yincheng.zhong <634916154@qq.com>
Date: 星期四, 04 十二月 2025 21:49:01 +0800
Subject: [PATCH] 在跑校准了,还是有些问题,GPS坐标有时候不更新

---
 python/hitl/protocols.py |  180 ++++++++++++++++++++++++++++-------------------------------
 1 files changed, 85 insertions(+), 95 deletions(-)

diff --git a/python/hitl/protocols.py b/python/hitl/protocols.py
index 9a29fc3..a542018 100644
--- a/python/hitl/protocols.py
+++ b/python/hitl/protocols.py
@@ -1,19 +1,28 @@
 """
 涓� STM32H7 浜や簰鐨勫崗璁皝瑁咃細
-- 鏋勯�� $GPRMI / $GPIMU 鍙ュ瓙
+- 鏋勯�� IM23A "fmin"/"fmim" 浼犳劅鍣ㄥ抚
 - 瑙f瀽 PythonLink 鎺у埗甯�
 - 瑙f瀽 ASCII 鐘舵�佹姤鏂�
 """
 
 from __future__ import annotations
 
-import math
 import struct
+import math
 from dataclasses import dataclass
 from datetime import datetime, timezone
 from typing import Callable, Optional, Tuple
+def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
+    ratio = _clamp((value - center) / span, -1.0, 1.0)
+    return ratio * max_speed
 
-GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
+
+
+IM23A_NAV_LEN = 100
+IM23A_IMU_LEN = 52
+IM23A_NAV_HEADER = b"fmin"
+IM23A_IMU_HEADER = b"fmim"
+IM23A_TAIL = b"ed"
 
 
 def _ensure_utc(dt: datetime) -> datetime:
@@ -22,31 +31,36 @@
     return dt.astimezone(timezone.utc)
 
 
-def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
-    dt = _ensure_utc(timestamp)
-    delta = dt - GPS_EPOCH
-    total_seconds = delta.total_seconds()
-    week = int(total_seconds // 604800)
-    tow = total_seconds - week * 604800
-    return week, tow
-
-
-def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
-    dt = _ensure_utc(timestamp)
-    base = dt.strftime("%H%M%S")
-    frac = dt.microsecond / 1_000_000.0
-    frac_str = f"{frac:.{decimals}f}".split(".")[1]
-    return f"{base}.{frac_str}"
-
-
 def nmea_checksum(sentence_without_star: str) -> str:
     cs = 0
-    for ch in sentence_without_star[1:]:  # 璺宠繃 $
+    for ch in sentence_without_star[1:]:
         cs ^= ord(ch)
     return f"{cs:02X}"
 
 
-def build_gprmi_sentence(
+def _write_double(buf: bytearray, value: float) -> None:
+    buf.extend(struct.pack("<d", float(value)))
+
+
+def _write_float(buf: bytearray, value: float) -> None:
+    buf.extend(struct.pack("<f", float(value)))
+
+
+def _write_u32(buf: bytearray, value: int) -> None:
+    buf.extend(struct.pack("<I", int(value)))
+
+
+def _write_u16(buf: bytearray, value: int) -> None:
+    buf.extend(struct.pack("<H", int(value)))
+
+
+def _append_tail(buf: bytearray) -> None:
+    checksum = sum(buf) & 0xFFFF
+    buf.extend(struct.pack("<H", checksum))
+    buf.extend(IM23A_TAIL)
+
+
+def build_im23a_nav_frame(
     timestamp: datetime,
     lat_deg: float,
     lon_deg: float,
@@ -57,88 +71,64 @@
     heading_deg: float,
     pitch_deg: float,
     roll_deg: float,
-    *,
-    lat_std: float = 0.008,
-    lon_std: float = 0.008,
-    alt_std: float = 0.02,
-    vel_std: float = 0.02,
-    heading_std: float = 0.1,
-    pitch_std: float = 0.1,
-    roll_std: float = 0.1,
-    baseline_m: float = 1.0,
-    sat_count: int = 20,
-    ambiguity_count: int = 18,
-    quality: int = 4,
+    accel_bias: Tuple[float, float, float],
+    gyro_bias: Tuple[float, float, float],
+    temperature_c: float,
+    status_flags: int,
 ) -> bytes:
-    timestamp = _ensure_utc(timestamp)
-    utc_str = format_hhmmss(timestamp, decimals=2)
-    week, tow = gps_week_and_tow(timestamp)
-
-    fields = [
-        utc_str,
-        str(week),
-        f"{tow:.3f}",
-        f"{lat_deg:.9f}",
-        f"{lon_deg:.9f}",
-        f"{alt_m:.3f}",
-        f"{lat_std:.3f}",
-        f"{lon_std:.3f}",
-        f"{alt_std:.3f}",
-        f"{east_vel:.3f}",
-        f"{north_vel:.3f}",
-        f"{up_vel:.3f}",
-        f"{vel_std:.3f}",
-        f"{heading_deg:.3f}",
-        f"{pitch_deg:.3f}",
-        f"{roll_deg:.3f}",
-        f"{heading_std:.3f}",
-        f"{pitch_std:.3f}",
-        f"{roll_std:.3f}",
-        f"{baseline_m:.3f}",
-        str(int(sat_count)),
-        str(int(ambiguity_count)),
-        str(int(quality)),
-    ]
-
-    body = "$GPFMI," + ",".join(fields)
-    checksum = nmea_checksum(body)
-    sentence = f"{body}*{checksum}\r\n"
-    return sentence.encode("ascii")
+    buf = bytearray(IM23A_NAV_HEADER)
+    _write_double(buf, _seconds_of_day(timestamp))
+    _write_double(buf, lat_deg)
+    _write_double(buf, lon_deg)
+    _write_double(buf, alt_m)
+    _write_float(buf, north_vel)
+    _write_float(buf, east_vel)
+    _write_float(buf, -up_vel)  # IM23A 鍚戜笅姝o紝MCU 鍐呴儴杞崲鍥炲悜涓�
+    _write_float(buf, roll_deg)
+    _write_float(buf, pitch_deg)
+    _write_float(buf, heading_deg)
+    _write_float(buf, 0.02)  # 瀹氫綅绮惧害锛屽彲鏍规嵁闇�瑕佽皟鏁�
+    _write_float(buf, accel_bias[0])
+    _write_float(buf, accel_bias[1])
+    _write_float(buf, accel_bias[2])
+    _write_float(buf, gyro_bias[0])
+    _write_float(buf, gyro_bias[1])
+    _write_float(buf, gyro_bias[2])
+    _write_float(buf, temperature_c)
+    _write_u32(buf, status_flags)
+    _append_tail(buf)
+    return bytes(buf)
 
 
-def build_gpimu_sentence(
+def build_im23a_imu_frame(
     timestamp: datetime,
     accel_g: Tuple[float, float, float],
     gyro_deg_s: Tuple[float, float, float],
-    temperature_c: float,
+    reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
 ) -> bytes:
-    timestamp = _ensure_utc(timestamp)
-    utc_str = format_hhmmss(timestamp, decimals=3)
-
-    fields = [
-        utc_str,
-        f"{accel_g[0]:+.4f}",
-        f"{accel_g[1]:+.4f}",
-        f"{accel_g[2]:+.4f}",
-        f"{gyro_deg_s[0]:+.4f}",
-        f"{gyro_deg_s[1]:+.4f}",
-        f"{gyro_deg_s[2]:+.4f}",
-        f"{temperature_c:.2f}",
-    ]
-
-    body = "$GPIMU," + ",".join(fields)
-    checksum = nmea_checksum(body)
-    sentence = f"{body}*{checksum}\r\n"
-    return sentence.encode("ascii")
+    buf = bytearray(IM23A_IMU_HEADER)
+    _write_double(buf, _seconds_of_day(timestamp))
+    _write_float(buf, accel_g[0])
+    _write_float(buf, accel_g[1])
+    _write_float(buf, accel_g[2])
+    _write_float(buf, gyro_deg_s[0])
+    _write_float(buf, gyro_deg_s[1])
+    _write_float(buf, gyro_deg_s[2])
+    _write_float(buf, reserves[0])
+    _write_float(buf, reserves[1])
+    _write_float(buf, reserves[2])
+    _append_tail(buf)
+    return bytes(buf)
 
 
-def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
-    ratio = _clamp((value - center) / span, -1.0, 1.0)
-    return ratio * max_speed
-
-
-def _clamp(val: float, min_val: float, max_val: float) -> float:
-    return max(min_val, min(max_val, val))
+def _seconds_of_day(dt: datetime) -> float:
+    utc = _ensure_utc(dt)
+    return (
+        utc.hour * 3600
+        + utc.minute * 60
+        + utc.second
+        + utc.microsecond / 1_000_000.0
+    )
 
 
 def _decode_payload(payload: bytes) -> Tuple[float, float]:

--
Gitblit v1.10.0