From f88f3da8f132cd1dd321dfc584a1fe68b6eb2138 Mon Sep 17 00:00:00 2001
From: yincheng.zhong <634916154@qq.com>
Date: 星期四, 04 十二月 2025 21:49:01 +0800
Subject: [PATCH] 在跑校准了,还是有些问题,GPS坐标有时候不更新
---
python/hitl/protocols.py | 180 ++++++++++++++++++++++++++++-------------------------------
1 files changed, 85 insertions(+), 95 deletions(-)
diff --git a/python/hitl/protocols.py b/python/hitl/protocols.py
index 9a29fc3..a542018 100644
--- a/python/hitl/protocols.py
+++ b/python/hitl/protocols.py
@@ -1,19 +1,28 @@
"""
涓� STM32H7 浜や簰鐨勫崗璁皝瑁咃細
-- 鏋勯�� $GPRMI / $GPIMU 鍙ュ瓙
+- 鏋勯�� IM23A "fmin"/"fmim" 浼犳劅鍣ㄥ抚
- 瑙f瀽 PythonLink 鎺у埗甯�
- 瑙f瀽 ASCII 鐘舵�佹姤鏂�
"""
from __future__ import annotations
-import math
import struct
+import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
+def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
+ ratio = _clamp((value - center) / span, -1.0, 1.0)
+ return ratio * max_speed
-GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
+
+
+IM23A_NAV_LEN = 100
+IM23A_IMU_LEN = 52
+IM23A_NAV_HEADER = b"fmin"
+IM23A_IMU_HEADER = b"fmim"
+IM23A_TAIL = b"ed"
def _ensure_utc(dt: datetime) -> datetime:
@@ -22,31 +31,36 @@
return dt.astimezone(timezone.utc)
-def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
- dt = _ensure_utc(timestamp)
- delta = dt - GPS_EPOCH
- total_seconds = delta.total_seconds()
- week = int(total_seconds // 604800)
- tow = total_seconds - week * 604800
- return week, tow
-
-
-def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
- dt = _ensure_utc(timestamp)
- base = dt.strftime("%H%M%S")
- frac = dt.microsecond / 1_000_000.0
- frac_str = f"{frac:.{decimals}f}".split(".")[1]
- return f"{base}.{frac_str}"
-
-
def nmea_checksum(sentence_without_star: str) -> str:
cs = 0
- for ch in sentence_without_star[1:]: # 璺宠繃 $
+ for ch in sentence_without_star[1:]:
cs ^= ord(ch)
return f"{cs:02X}"
-def build_gprmi_sentence(
+def _write_double(buf: bytearray, value: float) -> None:
+ buf.extend(struct.pack("<d", float(value)))
+
+
+def _write_float(buf: bytearray, value: float) -> None:
+ buf.extend(struct.pack("<f", float(value)))
+
+
+def _write_u32(buf: bytearray, value: int) -> None:
+ buf.extend(struct.pack("<I", int(value)))
+
+
+def _write_u16(buf: bytearray, value: int) -> None:
+ buf.extend(struct.pack("<H", int(value)))
+
+
+def _append_tail(buf: bytearray) -> None:
+ checksum = sum(buf) & 0xFFFF
+ buf.extend(struct.pack("<H", checksum))
+ buf.extend(IM23A_TAIL)
+
+
+def build_im23a_nav_frame(
timestamp: datetime,
lat_deg: float,
lon_deg: float,
@@ -57,88 +71,64 @@
heading_deg: float,
pitch_deg: float,
roll_deg: float,
- *,
- lat_std: float = 0.008,
- lon_std: float = 0.008,
- alt_std: float = 0.02,
- vel_std: float = 0.02,
- heading_std: float = 0.1,
- pitch_std: float = 0.1,
- roll_std: float = 0.1,
- baseline_m: float = 1.0,
- sat_count: int = 20,
- ambiguity_count: int = 18,
- quality: int = 4,
+ accel_bias: Tuple[float, float, float],
+ gyro_bias: Tuple[float, float, float],
+ temperature_c: float,
+ status_flags: int,
) -> bytes:
- timestamp = _ensure_utc(timestamp)
- utc_str = format_hhmmss(timestamp, decimals=2)
- week, tow = gps_week_and_tow(timestamp)
-
- fields = [
- utc_str,
- str(week),
- f"{tow:.3f}",
- f"{lat_deg:.9f}",
- f"{lon_deg:.9f}",
- f"{alt_m:.3f}",
- f"{lat_std:.3f}",
- f"{lon_std:.3f}",
- f"{alt_std:.3f}",
- f"{east_vel:.3f}",
- f"{north_vel:.3f}",
- f"{up_vel:.3f}",
- f"{vel_std:.3f}",
- f"{heading_deg:.3f}",
- f"{pitch_deg:.3f}",
- f"{roll_deg:.3f}",
- f"{heading_std:.3f}",
- f"{pitch_std:.3f}",
- f"{roll_std:.3f}",
- f"{baseline_m:.3f}",
- str(int(sat_count)),
- str(int(ambiguity_count)),
- str(int(quality)),
- ]
-
- body = "$GPFMI," + ",".join(fields)
- checksum = nmea_checksum(body)
- sentence = f"{body}*{checksum}\r\n"
- return sentence.encode("ascii")
+ buf = bytearray(IM23A_NAV_HEADER)
+ _write_double(buf, _seconds_of_day(timestamp))
+ _write_double(buf, lat_deg)
+ _write_double(buf, lon_deg)
+ _write_double(buf, alt_m)
+ _write_float(buf, north_vel)
+ _write_float(buf, east_vel)
+ _write_float(buf, -up_vel) # IM23A 鍚戜笅姝o紝MCU 鍐呴儴杞崲鍥炲悜涓�
+ _write_float(buf, roll_deg)
+ _write_float(buf, pitch_deg)
+ _write_float(buf, heading_deg)
+ _write_float(buf, 0.02) # 瀹氫綅绮惧害锛屽彲鏍规嵁闇�瑕佽皟鏁�
+ _write_float(buf, accel_bias[0])
+ _write_float(buf, accel_bias[1])
+ _write_float(buf, accel_bias[2])
+ _write_float(buf, gyro_bias[0])
+ _write_float(buf, gyro_bias[1])
+ _write_float(buf, gyro_bias[2])
+ _write_float(buf, temperature_c)
+ _write_u32(buf, status_flags)
+ _append_tail(buf)
+ return bytes(buf)
-def build_gpimu_sentence(
+def build_im23a_imu_frame(
timestamp: datetime,
accel_g: Tuple[float, float, float],
gyro_deg_s: Tuple[float, float, float],
- temperature_c: float,
+ reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
- timestamp = _ensure_utc(timestamp)
- utc_str = format_hhmmss(timestamp, decimals=3)
-
- fields = [
- utc_str,
- f"{accel_g[0]:+.4f}",
- f"{accel_g[1]:+.4f}",
- f"{accel_g[2]:+.4f}",
- f"{gyro_deg_s[0]:+.4f}",
- f"{gyro_deg_s[1]:+.4f}",
- f"{gyro_deg_s[2]:+.4f}",
- f"{temperature_c:.2f}",
- ]
-
- body = "$GPIMU," + ",".join(fields)
- checksum = nmea_checksum(body)
- sentence = f"{body}*{checksum}\r\n"
- return sentence.encode("ascii")
+ buf = bytearray(IM23A_IMU_HEADER)
+ _write_double(buf, _seconds_of_day(timestamp))
+ _write_float(buf, accel_g[0])
+ _write_float(buf, accel_g[1])
+ _write_float(buf, accel_g[2])
+ _write_float(buf, gyro_deg_s[0])
+ _write_float(buf, gyro_deg_s[1])
+ _write_float(buf, gyro_deg_s[2])
+ _write_float(buf, reserves[0])
+ _write_float(buf, reserves[1])
+ _write_float(buf, reserves[2])
+ _append_tail(buf)
+ return bytes(buf)
-def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
- ratio = _clamp((value - center) / span, -1.0, 1.0)
- return ratio * max_speed
-
-
-def _clamp(val: float, min_val: float, max_val: float) -> float:
- return max(min_val, min(max_val, val))
+def _seconds_of_day(dt: datetime) -> float:
+ utc = _ensure_utc(dt)
+ return (
+ utc.hour * 3600
+ + utc.minute * 60
+ + utc.second
+ + utc.microsecond / 1_000_000.0
+ )
def _decode_payload(payload: bytes) -> Tuple[float, float]:
--
Gitblit v1.10.0