From 212ccb49d3e7c7fa138c5f9d335d0b8c5a08d2a3 Mon Sep 17 00:00:00 2001
From: yincheng.zhong <634916154@qq.com>
Date: 星期日, 23 十一月 2025 22:21:39 +0800
Subject: [PATCH] GUI和LOG初步调通,进行下一阶段,验证仿真的数据准确性,GPS的

---
 python/hitl/protocols.py |  163 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 162 insertions(+), 1 deletions(-)

diff --git a/python/hitl/protocols.py b/python/hitl/protocols.py
index 0c9e898..14f46a0 100644
--- a/python/hitl/protocols.py
+++ b/python/hitl/protocols.py
@@ -2,6 +2,7 @@
 涓� STM32H7 浜や簰鐨勫崗璁皝瑁咃細
 - 鏋勯�� $GPRMI / $GPIMU 鍙ュ瓙
 - 瑙f瀽 PythonLink 鎺у埗甯�
+- 瑙f瀽 ASCII 鐘舵�佹姤鏂�
 """
 
 from __future__ import annotations
@@ -99,7 +100,7 @@
         str(int(quality)),
     ]
 
-    body = "$GPRMI," + ",".join(fields)
+    body = "$GPFMI," + ",".join(fields)
     checksum = nmea_checksum(body)
     sentence = f"{body}*{checksum}\r\n"
     return sentence.encode("ascii")
@@ -159,6 +160,51 @@
     turn: float
 
 
+@dataclass
+class PythonAsciiMessage:
+    tag: str
+    fields: list[str]
+
+
+@dataclass
+class ControlStatus:
+    forward_mps: float
+    turn_rate: float
+    freq_hz: float
+    steering_pwm: int
+    throttle_pwm: int
+    stage: str
+    timestamp_ms: float
+
+
+@dataclass
+class PoseStatus:
+    east: float
+    north: float
+    up: float
+    heading_deg: float
+    pitch_deg: float
+    roll_deg: float
+    target_east: float
+    target_north: float
+    timestamp_ms: float
+
+
+@dataclass
+class StateStatus:
+    stage: str
+    cross_track_error: float
+    heading_error_deg: float
+    timestamp_ms: float
+
+
+@dataclass
+class StackStatus:
+    task_name: str
+    stack_high_water: int
+    heap_free_bytes: int
+
+
 class PythonLinkDecoder:
     """
     瑙f瀽 PythonLink 鎺у埗甯� (0xAA 0x55 ... 0D 0A)銆�
@@ -227,3 +273,118 @@
         return PythonLinkFrame(forward=forward, turn=turn)
 
 
+def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
+    if not line:
+        return None
+    line = line.strip()
+    if not line.startswith("$"):
+        return None
+    checksum_str = ""
+    data_end = len(line)
+    star_idx = line.find("*")
+    if star_idx != -1:
+        data_end = star_idx
+        checksum_str = line[star_idx + 1 : star_idx + 3]
+    payload = line[1:data_end]
+    calc = 0
+    for ch in payload:
+        calc ^= ord(ch)
+    if checksum_str:
+        try:
+            provided = int(checksum_str, 16)
+        except ValueError:
+            return None
+        if provided != calc:
+            return None
+    parts = payload.split(",")
+    if not parts:
+        return None
+    tag = parts[0]
+    fields = parts[1:]
+    return PythonAsciiMessage(tag=tag, fields=fields)
+
+
+def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
+    if msg.tag.upper() != "CTRL" or len(msg.fields) < 6:
+        return None
+    try:
+        forward = float(msg.fields[0])
+        turn = float(msg.fields[1])
+        freq = float(msg.fields[2])
+        steering = int(float(msg.fields[3]))
+        throttle = int(float(msg.fields[4]))
+        stage = msg.fields[5]
+        timestamp = float(msg.fields[6]) if len(msg.fields) > 6 else 0.0
+    except ValueError:
+        return None
+    return ControlStatus(
+        forward_mps=forward,
+        turn_rate=turn,
+        freq_hz=freq,
+        steering_pwm=steering,
+        throttle_pwm=throttle,
+        stage=stage,
+        timestamp_ms=timestamp,
+    )
+
+
+def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
+    if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
+        return None
+    try:
+        east = float(msg.fields[0])
+        north = float(msg.fields[1])
+        up = float(msg.fields[2])
+        heading = float(msg.fields[3])
+        pitch = float(msg.fields[4])
+        roll = float(msg.fields[5])
+        target_e = float(msg.fields[6])
+        target_n = float(msg.fields[7])
+        timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
+    except ValueError:
+        return None
+    return PoseStatus(
+        east=east,
+        north=north,
+        up=up,
+        heading_deg=heading,
+        pitch_deg=pitch,
+        roll_deg=roll,
+        target_east=target_e,
+        target_north=target_n,
+        timestamp_ms=timestamp,
+    )
+
+
+def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
+    if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
+        return None
+    stage = msg.fields[0]
+    try:
+        xte = float(msg.fields[1])
+        heading_err = float(msg.fields[2])
+        timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
+    except ValueError:
+        return None
+    return StateStatus(
+        stage=stage,
+        cross_track_error=xte,
+        heading_error_deg=heading_err,
+        timestamp_ms=timestamp,
+    )
+
+
+def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
+    if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
+        return None
+    task = msg.fields[0]
+    try:
+        stack_hw = int(float(msg.fields[1]))
+        heap_free = int(float(msg.fields[2]))
+    except ValueError:
+        return None
+    return StackStatus(task_name=task,
+                       stack_high_water=stack_hw,
+                       heap_free_bytes=heap_free)
+
+

--
Gitblit v1.10.0