From 212ccb49d3e7c7fa138c5f9d335d0b8c5a08d2a3 Mon Sep 17 00:00:00 2001
From: yincheng.zhong <634916154@qq.com>
Date: 星期日, 23 十一月 2025 22:21:39 +0800
Subject: [PATCH] GUI和LOG初步调通,进行下一阶段,验证仿真的数据准确性,GPS的
---
python/hitl/protocols.py | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 162 insertions(+), 1 deletions(-)
diff --git a/python/hitl/protocols.py b/python/hitl/protocols.py
index 0c9e898..14f46a0 100644
--- a/python/hitl/protocols.py
+++ b/python/hitl/protocols.py
@@ -2,6 +2,7 @@
涓� STM32H7 浜や簰鐨勫崗璁皝瑁咃細
- 鏋勯�� $GPRMI / $GPIMU 鍙ュ瓙
- 瑙f瀽 PythonLink 鎺у埗甯�
+- 瑙f瀽 ASCII 鐘舵�佹姤鏂�
"""
from __future__ import annotations
@@ -99,7 +100,7 @@
str(int(quality)),
]
- body = "$GPRMI," + ",".join(fields)
+ body = "$GPFMI," + ",".join(fields)
checksum = nmea_checksum(body)
sentence = f"{body}*{checksum}\r\n"
return sentence.encode("ascii")
@@ -159,6 +160,51 @@
turn: float
+@dataclass
+class PythonAsciiMessage:
+ tag: str
+ fields: list[str]
+
+
+@dataclass
+class ControlStatus:
+ forward_mps: float
+ turn_rate: float
+ freq_hz: float
+ steering_pwm: int
+ throttle_pwm: int
+ stage: str
+ timestamp_ms: float
+
+
+@dataclass
+class PoseStatus:
+ east: float
+ north: float
+ up: float
+ heading_deg: float
+ pitch_deg: float
+ roll_deg: float
+ target_east: float
+ target_north: float
+ timestamp_ms: float
+
+
+@dataclass
+class StateStatus:
+ stage: str
+ cross_track_error: float
+ heading_error_deg: float
+ timestamp_ms: float
+
+
+@dataclass
+class StackStatus:
+ task_name: str
+ stack_high_water: int
+ heap_free_bytes: int
+
+
class PythonLinkDecoder:
"""
瑙f瀽 PythonLink 鎺у埗甯� (0xAA 0x55 ... 0D 0A)銆�
@@ -227,3 +273,118 @@
return PythonLinkFrame(forward=forward, turn=turn)
+def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
+ if not line:
+ return None
+ line = line.strip()
+ if not line.startswith("$"):
+ return None
+ checksum_str = ""
+ data_end = len(line)
+ star_idx = line.find("*")
+ if star_idx != -1:
+ data_end = star_idx
+ checksum_str = line[star_idx + 1 : star_idx + 3]
+ payload = line[1:data_end]
+ calc = 0
+ for ch in payload:
+ calc ^= ord(ch)
+ if checksum_str:
+ try:
+ provided = int(checksum_str, 16)
+ except ValueError:
+ return None
+ if provided != calc:
+ return None
+ parts = payload.split(",")
+ if not parts:
+ return None
+ tag = parts[0]
+ fields = parts[1:]
+ return PythonAsciiMessage(tag=tag, fields=fields)
+
+
+def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
+ if msg.tag.upper() != "CTRL" or len(msg.fields) < 6:
+ return None
+ try:
+ forward = float(msg.fields[0])
+ turn = float(msg.fields[1])
+ freq = float(msg.fields[2])
+ steering = int(float(msg.fields[3]))
+ throttle = int(float(msg.fields[4]))
+ stage = msg.fields[5]
+ timestamp = float(msg.fields[6]) if len(msg.fields) > 6 else 0.0
+ except ValueError:
+ return None
+ return ControlStatus(
+ forward_mps=forward,
+ turn_rate=turn,
+ freq_hz=freq,
+ steering_pwm=steering,
+ throttle_pwm=throttle,
+ stage=stage,
+ timestamp_ms=timestamp,
+ )
+
+
+def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
+ if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
+ return None
+ try:
+ east = float(msg.fields[0])
+ north = float(msg.fields[1])
+ up = float(msg.fields[2])
+ heading = float(msg.fields[3])
+ pitch = float(msg.fields[4])
+ roll = float(msg.fields[5])
+ target_e = float(msg.fields[6])
+ target_n = float(msg.fields[7])
+ timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
+ except ValueError:
+ return None
+ return PoseStatus(
+ east=east,
+ north=north,
+ up=up,
+ heading_deg=heading,
+ pitch_deg=pitch,
+ roll_deg=roll,
+ target_east=target_e,
+ target_north=target_n,
+ timestamp_ms=timestamp,
+ )
+
+
+def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
+ if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
+ return None
+ stage = msg.fields[0]
+ try:
+ xte = float(msg.fields[1])
+ heading_err = float(msg.fields[2])
+ timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
+ except ValueError:
+ return None
+ return StateStatus(
+ stage=stage,
+ cross_track_error=xte,
+ heading_error_deg=heading_err,
+ timestamp_ms=timestamp,
+ )
+
+
+def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
+ if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
+ return None
+ task = msg.fields[0]
+ try:
+ stack_hw = int(float(msg.fields[1]))
+ heap_free = int(float(msg.fields[2]))
+ except ValueError:
+ return None
+ return StackStatus(task_name=task,
+ stack_high_water=stack_hw,
+ heap_free_bytes=heap_free)
+
+
--
Gitblit v1.10.0