yincheng.zhong
2025-11-27 1ebe8cd1247982a2c9d1d75b9c72d214eed4d581
python/hitl/protocols.py
@@ -2,6 +2,7 @@
与 STM32H7 交互的协议封装:
- 构造 $GPRMI / $GPIMU 句子
- 解析 PythonLink 控制帧
- 解析 ASCII 状态报文
"""
from __future__ import annotations
@@ -99,7 +100,7 @@
        str(int(quality)),
    ]
    body = "$GPRMI," + ",".join(fields)
    body = "$GPFMI," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
@@ -159,6 +160,58 @@
    turn: float
@dataclass
class PythonAsciiMessage:
    tag: str
    fields: list[str]
@dataclass
class ControlStatus:
    forward_mps: float
    turn_rate: float
    freq_hz: float
    steering_pwm: int
    throttle_pwm: int
    stage: str
    timestamp_ms: float
    east: float
    north: float
    up: float
    heading_deg: float
    target_heading_deg: float
    target_east: float
    target_north: float
@dataclass
class PoseStatus:
    east: float
    north: float
    up: float
    heading_deg: float
    pitch_deg: float
    roll_deg: float
    target_east: float
    target_north: float
    timestamp_ms: float
@dataclass
class StateStatus:
    stage: str
    cross_track_error: float
    heading_error_deg: float
    timestamp_ms: float
@dataclass
class StackStatus:
    task_name: str
    stack_high_water: int
    heap_free_bytes: int
class PythonLinkDecoder:
    """
    解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
@@ -227,3 +280,132 @@
        return PythonLinkFrame(forward=forward, turn=turn)
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
    if not line:
        return None
    line = line.strip()
    if not line.startswith("$"):
        return None
    checksum_str = ""
    data_end = len(line)
    star_idx = line.find("*")
    if star_idx != -1:
        data_end = star_idx
        checksum_str = line[star_idx + 1 : star_idx + 3]
    payload = line[1:data_end]
    calc = 0
    for ch in payload:
        calc ^= ord(ch)
    if checksum_str:
        try:
            provided = int(checksum_str, 16)
        except ValueError:
            return None
        if provided != calc:
            return None
    parts = payload.split(",")
    if not parts:
        return None
    tag = parts[0]
    fields = parts[1:]
    return PythonAsciiMessage(tag=tag, fields=fields)
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
    if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
        return None
    try:
        forward = float(msg.fields[0])
        turn = float(msg.fields[1])
        freq = float(msg.fields[2])
        steering = int(float(msg.fields[3]))
        throttle = int(float(msg.fields[4]))
        stage = msg.fields[5]
        timestamp = float(msg.fields[6])
        east = float(msg.fields[7])
        north = float(msg.fields[8])
        up = float(msg.fields[9])
        heading = float(msg.fields[10])
        target_heading = float(msg.fields[11])
        target_e = float(msg.fields[12])
        target_n = float(msg.fields[13])
    except ValueError:
        return None
    return ControlStatus(
        forward_mps=forward,
        turn_rate=turn,
        freq_hz=freq,
        steering_pwm=steering,
        throttle_pwm=throttle,
        stage=stage,
        timestamp_ms=timestamp,
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        target_heading_deg=target_heading,
        target_east=target_e,
        target_north=target_n,
    )
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
    if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
        return None
    try:
        east = float(msg.fields[0])
        north = float(msg.fields[1])
        up = float(msg.fields[2])
        heading = float(msg.fields[3])
        pitch = float(msg.fields[4])
        roll = float(msg.fields[5])
        target_e = float(msg.fields[6])
        target_n = float(msg.fields[7])
        timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
    except ValueError:
        return None
    return PoseStatus(
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        pitch_deg=pitch,
        roll_deg=roll,
        target_east=target_e,
        target_north=target_n,
        timestamp_ms=timestamp,
    )
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
    if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
        return None
    stage = msg.fields[0]
    try:
        xte = float(msg.fields[1])
        heading_err = float(msg.fields[2])
        timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
    except ValueError:
        return None
    return StateStatus(
        stage=stage,
        cross_track_error=xte,
        heading_error_deg=heading_err,
        timestamp_ms=timestamp,
    )
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
    if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
        return None
    task = msg.fields[0]
    try:
        stack_hw = int(float(msg.fields[1]))
        heap_free = int(float(msg.fields[2]))
    except ValueError:
        return None
    return StackStatus(task_name=task,
                       stack_high_water=stack_hw,
                       heap_free_bytes=heap_free)