From 3c2021441490ae9b93f7a2ef0f379909b589edd9 Mon Sep 17 00:00:00 2001
From: yincheng.zhong <634916154@qq.com>
Date: 星期六, 13 十二月 2025 18:53:16 +0800
Subject: [PATCH] 应该是拍视频的版本,外包在此基础上加了MQTT部分。
---
python/hitl/protocols.py | 408 ++++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 312 insertions(+), 96 deletions(-)
diff --git a/python/hitl/protocols.py b/python/hitl/protocols.py
index 0c9e898..06cad2e 100644
--- a/python/hitl/protocols.py
+++ b/python/hitl/protocols.py
@@ -1,18 +1,72 @@
"""
涓� STM32H7 浜や簰鐨勫崗璁皝瑁咃細
-- 鏋勯�� $GPRMI / $GPIMU 鍙ュ瓙
+- 鏋勯�� IM23A "fmin"/"fmim" 浼犳劅鍣ㄥ抚
- 瑙f瀽 PythonLink 鎺у埗甯�
+- 瑙f瀽 ASCII 鐘舵�佹姤鏂�
"""
from __future__ import annotations
-import math
import struct
+import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
+def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
+ """
+ 灏� PWM 淇″彿杞崲涓虹嚎閫熷害鎴栬閫熷害銆�
+ 鍩轰簬 motion_config.h 涓殑妯″瀷鍙傛暟锛�
+ Forward: v = K_fwd * (1500 - pwm) + Bias (浣嗘ā鎷熷櫒杩欓噷涓昏鏄负浜嗛棴鐜紝涓嶉渶瑕佸畬缇庡尮閰岯ias锛屽彧瑕佹柟鍚戝)
+ 1000 PWM = 0.56 m/s
+ 1500 PWM = 0.0 m/s
+ 2000 PWM = -0.2 m/s (backward)
+ Turn: w = K_turn * (1500 - pwm)
+ 1000 PWM = 63 deg/s (Left)
+ 2000 PWM = -61.5 deg/s (Right)
+ """
+ if value < 1000: value = 1000
+ if value > 2000: value = 2000
+
+ # 浣跨敤鍙傛暟
+ # MC_CFG_FORWARD_K = 0.00196
+ # MC_CFG_FORWARD_BIAS = -0.077
+ # MC_CFG_STEERING_K_LEFT = 0.00206
+ # MC_CFG_STEERING_K_RIGHT = 0.00201
+
+ # 绠�鍗曠殑绾挎�ф槧灏勭敤浜庢ā鎷燂紝蹇界暐姝诲尯鍜屽亸缃殑缁嗚妭锛屼繚璇佸ぇ鑷磋寖鍥存纭嵆鍙�
+ # 鐪熷疄閫熷害 = K * (1500 - pwm) + Bias
+
+ delta = 1500 - value
+ # 妯℃嫙鍣ㄩ渶瑕佽繑鍥為�熷害鍊硷紝鑰屼笉鏄缃�熷害銆�
+ # 杩欓噷鐨� max_speed 鍙傛暟涓嶅啀鐩存帴浣滀负姣斾緥绯绘暟锛岃�屾槸鐢ㄤ簬鎸囩ず鏄嚎閫熷害杩樻槸瑙掗�熷害鐨勭缉鏀�
+
+ if max_speed < 10.0: # 绾块�熷害妯″紡 (m/s)
+ # 鍋囪鏄� Forward PWM
+ # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us
+ # 2000 -> -500 diff -> -0.56 m/s (瀹為檯杞﹀彲鑳芥槸0.2锛屼絾妯℃嫙鍣ㄥ彲浠ュ绉�)
+ # 浣跨敤骞冲潎 K = 0.0015
+ k_lin = 0.0015
+ if abs(delta) < 40: return 0.0 # 姝诲尯
+ return delta * k_lin
+ else: # 瑙掗�熷害妯″紡 (deg/s -> rad/s)
+ # 鍋囪鏄� Steering PWM
+ # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us
+ # 2000 -> -500 diff -> -61.5 deg/s
+ # K_ang ~ 0.12 deg/s/us
+ k_ang_deg = 0.12
+ if abs(delta) < 10: return 0.0 # 姝诲尯
+ deg_s = delta * k_ang_deg
+ # 杞崲涓� rad/s ? 涓嬮潰璋冪敤鏄� math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
+ # 鎵�浠ヨ繖閲屽彧闇�瑕佽繑鍥� deg/s锛岃皟鐢ㄨ�呬細杞� rad/s
+ return deg_s
-GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
+
+
+IM23A_NAV_LEN = 100
+IM23A_IMU_LEN = 52
+IM23A_NAV_HEADER = b"fmin"
+IM23A_IMU_HEADER = b"fmim"
+IM23A_TAIL = b"ed"
def _ensure_utc(dt: datetime) -> datetime:
@@ -21,31 +75,36 @@
return dt.astimezone(timezone.utc)
-def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
- dt = _ensure_utc(timestamp)
- delta = dt - GPS_EPOCH
- total_seconds = delta.total_seconds()
- week = int(total_seconds // 604800)
- tow = total_seconds - week * 604800
- return week, tow
-
-
-def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
- dt = _ensure_utc(timestamp)
- base = dt.strftime("%H%M%S")
- frac = dt.microsecond / 1_000_000.0
- frac_str = f"{frac:.{decimals}f}".split(".")[1]
- return f"{base}.{frac_str}"
-
-
def nmea_checksum(sentence_without_star: str) -> str:
cs = 0
- for ch in sentence_without_star[1:]: # 璺宠繃 $
+ for ch in sentence_without_star[1:]:
cs ^= ord(ch)
return f"{cs:02X}"
-def build_gprmi_sentence(
+def _write_double(buf: bytearray, value: float) -> None:
+ buf.extend(struct.pack("<d", float(value)))
+
+
+def _write_float(buf: bytearray, value: float) -> None:
+ buf.extend(struct.pack("<f", float(value)))
+
+
+def _write_u32(buf: bytearray, value: int) -> None:
+ buf.extend(struct.pack("<I", int(value)))
+
+
+def _write_u16(buf: bytearray, value: int) -> None:
+ buf.extend(struct.pack("<H", int(value)))
+
+
+def _append_tail(buf: bytearray) -> None:
+ checksum = sum(buf) & 0xFFFF
+ buf.extend(struct.pack("<H", checksum))
+ buf.extend(IM23A_TAIL)
+
+
+def build_im23a_nav_frame(
timestamp: datetime,
lat_deg: float,
lon_deg: float,
@@ -56,88 +115,64 @@
heading_deg: float,
pitch_deg: float,
roll_deg: float,
- *,
- lat_std: float = 0.008,
- lon_std: float = 0.008,
- alt_std: float = 0.02,
- vel_std: float = 0.02,
- heading_std: float = 0.1,
- pitch_std: float = 0.1,
- roll_std: float = 0.1,
- baseline_m: float = 1.0,
- sat_count: int = 20,
- ambiguity_count: int = 18,
- quality: int = 4,
+ accel_bias: Tuple[float, float, float],
+ gyro_bias: Tuple[float, float, float],
+ temperature_c: float,
+ status_flags: int,
) -> bytes:
- timestamp = _ensure_utc(timestamp)
- utc_str = format_hhmmss(timestamp, decimals=2)
- week, tow = gps_week_and_tow(timestamp)
-
- fields = [
- utc_str,
- str(week),
- f"{tow:.3f}",
- f"{lat_deg:.9f}",
- f"{lon_deg:.9f}",
- f"{alt_m:.3f}",
- f"{lat_std:.3f}",
- f"{lon_std:.3f}",
- f"{alt_std:.3f}",
- f"{east_vel:.3f}",
- f"{north_vel:.3f}",
- f"{up_vel:.3f}",
- f"{vel_std:.3f}",
- f"{heading_deg:.3f}",
- f"{pitch_deg:.3f}",
- f"{roll_deg:.3f}",
- f"{heading_std:.3f}",
- f"{pitch_std:.3f}",
- f"{roll_std:.3f}",
- f"{baseline_m:.3f}",
- str(int(sat_count)),
- str(int(ambiguity_count)),
- str(int(quality)),
- ]
-
- body = "$GPRMI," + ",".join(fields)
- checksum = nmea_checksum(body)
- sentence = f"{body}*{checksum}\r\n"
- return sentence.encode("ascii")
+ buf = bytearray(IM23A_NAV_HEADER)
+ _write_double(buf, _seconds_of_day(timestamp))
+ _write_double(buf, lat_deg)
+ _write_double(buf, lon_deg)
+ _write_double(buf, alt_m)
+ _write_float(buf, north_vel)
+ _write_float(buf, east_vel)
+ _write_float(buf, -up_vel) # IM23A 鍚戜笅姝o紝MCU 鍐呴儴杞崲鍥炲悜涓�
+ _write_float(buf, roll_deg)
+ _write_float(buf, pitch_deg)
+ _write_float(buf, heading_deg)
+ _write_float(buf, 0.02) # 瀹氫綅绮惧害锛屽彲鏍规嵁闇�瑕佽皟鏁�
+ _write_float(buf, accel_bias[0])
+ _write_float(buf, accel_bias[1])
+ _write_float(buf, accel_bias[2])
+ _write_float(buf, gyro_bias[0])
+ _write_float(buf, gyro_bias[1])
+ _write_float(buf, gyro_bias[2])
+ _write_float(buf, temperature_c)
+ _write_u32(buf, status_flags)
+ _append_tail(buf)
+ return bytes(buf)
-def build_gpimu_sentence(
+def build_im23a_imu_frame(
timestamp: datetime,
accel_g: Tuple[float, float, float],
gyro_deg_s: Tuple[float, float, float],
- temperature_c: float,
+ reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
- timestamp = _ensure_utc(timestamp)
- utc_str = format_hhmmss(timestamp, decimals=3)
-
- fields = [
- utc_str,
- f"{accel_g[0]:+.4f}",
- f"{accel_g[1]:+.4f}",
- f"{accel_g[2]:+.4f}",
- f"{gyro_deg_s[0]:+.4f}",
- f"{gyro_deg_s[1]:+.4f}",
- f"{gyro_deg_s[2]:+.4f}",
- f"{temperature_c:.2f}",
- ]
-
- body = "$GPIMU," + ",".join(fields)
- checksum = nmea_checksum(body)
- sentence = f"{body}*{checksum}\r\n"
- return sentence.encode("ascii")
+ buf = bytearray(IM23A_IMU_HEADER)
+ _write_double(buf, _seconds_of_day(timestamp))
+ _write_float(buf, accel_g[0])
+ _write_float(buf, accel_g[1])
+ _write_float(buf, accel_g[2])
+ _write_float(buf, gyro_deg_s[0])
+ _write_float(buf, gyro_deg_s[1])
+ _write_float(buf, gyro_deg_s[2])
+ _write_float(buf, reserves[0])
+ _write_float(buf, reserves[1])
+ _write_float(buf, reserves[2])
+ _append_tail(buf)
+ return bytes(buf)
-def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
- ratio = _clamp((value - center) / span, -1.0, 1.0)
- return ratio * max_speed
-
-
-def _clamp(val: float, min_val: float, max_val: float) -> float:
- return max(min_val, min(max_val, val))
+def _seconds_of_day(dt: datetime) -> float:
+ utc = _ensure_utc(dt)
+ return (
+ utc.hour * 3600
+ + utc.minute * 60
+ + utc.second
+ + utc.microsecond / 1_000_000.0
+ )
def _decode_payload(payload: bytes) -> Tuple[float, float]:
@@ -147,7 +182,7 @@
if len(payload) == 4:
steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
# 閲嶆柊鏄犲皠锛氳浆鍚� PWM 鈫� yaw rate锛涙补闂� PWM 鈫� 绾块�熷害
- forward = _pwm_to_velocity(throttle_pwm)
+ forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0)
turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
return forward, turn
raise ValueError(f"涓嶆敮鎸佺殑鎺у埗璐熻浇闀垮害: {len(payload)}")
@@ -159,6 +194,58 @@
turn: float
+@dataclass
+class PythonAsciiMessage:
+ tag: str
+ fields: list[str]
+
+
+@dataclass
+class ControlStatus:
+ forward_mps: float
+ turn_rate: float
+ freq_hz: float
+ steering_pwm: int
+ throttle_pwm: int
+ stage: str
+ timestamp_ms: float
+ east: float
+ north: float
+ up: float
+ heading_deg: float
+ target_heading_deg: float
+ target_east: float
+ target_north: float
+
+
+@dataclass
+class PoseStatus:
+ east: float
+ north: float
+ up: float
+ heading_deg: float
+ pitch_deg: float
+ roll_deg: float
+ target_east: float
+ target_north: float
+ timestamp_ms: float
+
+
+@dataclass
+class StateStatus:
+ stage: str
+ cross_track_error: float
+ heading_error_deg: float
+ timestamp_ms: float
+
+
+@dataclass
+class StackStatus:
+ task_name: str
+ stack_high_water: int
+ heap_free_bytes: int
+
+
class PythonLinkDecoder:
"""
瑙f瀽 PythonLink 鎺у埗甯� (0xAA 0x55 ... 0D 0A)銆�
@@ -227,3 +314,132 @@
return PythonLinkFrame(forward=forward, turn=turn)
+def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
+ if not line:
+ return None
+ line = line.strip()
+ if not line.startswith("$"):
+ return None
+ checksum_str = ""
+ data_end = len(line)
+ star_idx = line.find("*")
+ if star_idx != -1:
+ data_end = star_idx
+ checksum_str = line[star_idx + 1 : star_idx + 3]
+ payload = line[1:data_end]
+ calc = 0
+ for ch in payload:
+ calc ^= ord(ch)
+ if checksum_str:
+ try:
+ provided = int(checksum_str, 16)
+ except ValueError:
+ return None
+ if provided != calc:
+ return None
+ parts = payload.split(",")
+ if not parts:
+ return None
+ tag = parts[0]
+ fields = parts[1:]
+ return PythonAsciiMessage(tag=tag, fields=fields)
+
+
+def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
+ if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
+ return None
+ try:
+ forward = float(msg.fields[0])
+ turn = float(msg.fields[1])
+ freq = float(msg.fields[2])
+ steering = int(float(msg.fields[3]))
+ throttle = int(float(msg.fields[4]))
+ stage = msg.fields[5]
+ timestamp = float(msg.fields[6])
+ east = float(msg.fields[7])
+ north = float(msg.fields[8])
+ up = float(msg.fields[9])
+ heading = float(msg.fields[10])
+ target_heading = float(msg.fields[11])
+ target_e = float(msg.fields[12])
+ target_n = float(msg.fields[13])
+ except ValueError:
+ return None
+ return ControlStatus(
+ forward_mps=forward,
+ turn_rate=turn,
+ freq_hz=freq,
+ steering_pwm=steering,
+ throttle_pwm=throttle,
+ stage=stage,
+ timestamp_ms=timestamp,
+ east=east,
+ north=north,
+ up=up,
+ heading_deg=heading,
+ target_heading_deg=target_heading,
+ target_east=target_e,
+ target_north=target_n,
+ )
+
+
+def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
+ if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
+ return None
+ try:
+ east = float(msg.fields[0])
+ north = float(msg.fields[1])
+ up = float(msg.fields[2])
+ heading = float(msg.fields[3])
+ pitch = float(msg.fields[4])
+ roll = float(msg.fields[5])
+ target_e = float(msg.fields[6])
+ target_n = float(msg.fields[7])
+ timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
+ except ValueError:
+ return None
+ return PoseStatus(
+ east=east,
+ north=north,
+ up=up,
+ heading_deg=heading,
+ pitch_deg=pitch,
+ roll_deg=roll,
+ target_east=target_e,
+ target_north=target_n,
+ timestamp_ms=timestamp,
+ )
+
+
+def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
+ if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
+ return None
+ stage = msg.fields[0]
+ try:
+ xte = float(msg.fields[1])
+ heading_err = float(msg.fields[2])
+ timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
+ except ValueError:
+ return None
+ return StateStatus(
+ stage=stage,
+ cross_track_error=xte,
+ heading_error_deg=heading_err,
+ timestamp_ms=timestamp,
+ )
+
+
+def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
+ if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
+ return None
+ task = msg.fields[0]
+ try:
+ stack_hw = int(float(msg.fields[1]))
+ heap_free = int(float(msg.fields[2]))
+ except ValueError:
+ return None
+ return StackStatus(task_name=task,
+ stack_high_water=stack_hw,
+ heap_free_bytes=heap_free)
+
+
--
Gitblit v1.10.0