| | |
| | | 与 STM32H7 交互的协议封装: |
| | | - 构造 $GPRMI / $GPIMU 句子 |
| | | - 解析 PythonLink 控制帧 |
| | | - 解析 ASCII 状态报文 |
| | | """ |
| | | |
| | | from __future__ import annotations |
| | |
| | | str(int(quality)), |
| | | ] |
| | | |
| | | body = "$GPRMI," + ",".join(fields) |
| | | body = "$GPFMI," + ",".join(fields) |
| | | checksum = nmea_checksum(body) |
| | | sentence = f"{body}*{checksum}\r\n" |
| | | return sentence.encode("ascii") |
| | |
| | | turn: float |
| | | |
| | | |
| | | @dataclass |
| | | class PythonAsciiMessage: |
| | | tag: str |
| | | fields: list[str] |
| | | |
| | | |
| | | @dataclass |
| | | class ControlStatus: |
| | | forward_mps: float |
| | | turn_rate: float |
| | | freq_hz: float |
| | | steering_pwm: int |
| | | throttle_pwm: int |
| | | stage: str |
| | | timestamp_ms: float |
| | | |
| | | |
| | | @dataclass |
| | | class PoseStatus: |
| | | east: float |
| | | north: float |
| | | up: float |
| | | heading_deg: float |
| | | pitch_deg: float |
| | | roll_deg: float |
| | | target_east: float |
| | | target_north: float |
| | | timestamp_ms: float |
| | | |
| | | |
| | | @dataclass |
| | | class StateStatus: |
| | | stage: str |
| | | cross_track_error: float |
| | | heading_error_deg: float |
| | | timestamp_ms: float |
| | | |
| | | |
| | | @dataclass |
| | | class StackStatus: |
| | | task_name: str |
| | | stack_high_water: int |
| | | heap_free_bytes: int |
| | | |
| | | |
| | | class PythonLinkDecoder: |
| | | """ |
| | | 解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。 |
| | |
| | | return PythonLinkFrame(forward=forward, turn=turn) |
| | | |
| | | |
| | | def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]: |
| | | if not line: |
| | | return None |
| | | line = line.strip() |
| | | if not line.startswith("$"): |
| | | return None |
| | | checksum_str = "" |
| | | data_end = len(line) |
| | | star_idx = line.find("*") |
| | | if star_idx != -1: |
| | | data_end = star_idx |
| | | checksum_str = line[star_idx + 1 : star_idx + 3] |
| | | payload = line[1:data_end] |
| | | calc = 0 |
| | | for ch in payload: |
| | | calc ^= ord(ch) |
| | | if checksum_str: |
| | | try: |
| | | provided = int(checksum_str, 16) |
| | | except ValueError: |
| | | return None |
| | | if provided != calc: |
| | | return None |
| | | parts = payload.split(",") |
| | | if not parts: |
| | | return None |
| | | tag = parts[0] |
| | | fields = parts[1:] |
| | | return PythonAsciiMessage(tag=tag, fields=fields) |
| | | |
| | | |
| | | def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]: |
| | | if msg.tag.upper() != "CTRL" or len(msg.fields) < 6: |
| | | return None |
| | | try: |
| | | forward = float(msg.fields[0]) |
| | | turn = float(msg.fields[1]) |
| | | freq = float(msg.fields[2]) |
| | | steering = int(float(msg.fields[3])) |
| | | throttle = int(float(msg.fields[4])) |
| | | stage = msg.fields[5] |
| | | timestamp = float(msg.fields[6]) if len(msg.fields) > 6 else 0.0 |
| | | except ValueError: |
| | | return None |
| | | return ControlStatus( |
| | | forward_mps=forward, |
| | | turn_rate=turn, |
| | | freq_hz=freq, |
| | | steering_pwm=steering, |
| | | throttle_pwm=throttle, |
| | | stage=stage, |
| | | timestamp_ms=timestamp, |
| | | ) |
| | | |
| | | |
| | | def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]: |
| | | if msg.tag.upper() != "POSE" or len(msg.fields) < 8: |
| | | return None |
| | | try: |
| | | east = float(msg.fields[0]) |
| | | north = float(msg.fields[1]) |
| | | up = float(msg.fields[2]) |
| | | heading = float(msg.fields[3]) |
| | | pitch = float(msg.fields[4]) |
| | | roll = float(msg.fields[5]) |
| | | target_e = float(msg.fields[6]) |
| | | target_n = float(msg.fields[7]) |
| | | timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0 |
| | | except ValueError: |
| | | return None |
| | | return PoseStatus( |
| | | east=east, |
| | | north=north, |
| | | up=up, |
| | | heading_deg=heading, |
| | | pitch_deg=pitch, |
| | | roll_deg=roll, |
| | | target_east=target_e, |
| | | target_north=target_n, |
| | | timestamp_ms=timestamp, |
| | | ) |
| | | |
| | | |
| | | def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]: |
| | | if msg.tag.upper() != "STATE" or len(msg.fields) < 3: |
| | | return None |
| | | stage = msg.fields[0] |
| | | try: |
| | | xte = float(msg.fields[1]) |
| | | heading_err = float(msg.fields[2]) |
| | | timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0 |
| | | except ValueError: |
| | | return None |
| | | return StateStatus( |
| | | stage=stage, |
| | | cross_track_error=xte, |
| | | heading_error_deg=heading_err, |
| | | timestamp_ms=timestamp, |
| | | ) |
| | | |
| | | |
| | | def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]: |
| | | if msg.tag.upper() != "STACK" or len(msg.fields) < 3: |
| | | return None |
| | | task = msg.fields[0] |
| | | try: |
| | | stack_hw = int(float(msg.fields[1])) |
| | | heap_free = int(float(msg.fields[2])) |
| | | except ValueError: |
| | | return None |
| | | return StackStatus(task_name=task, |
| | | stack_high_water=stack_hw, |
| | | heap_free_bytes=heap_free) |
| | | |
| | | |