""" 与 STM32H7 交互的协议封装: - 构造 $GPRMI / $GPIMU 句子 - 解析 PythonLink 控制帧 - 解析 ASCII 状态报文 """ from __future__ import annotations import math import struct from dataclasses import dataclass from datetime import datetime, timezone from typing import Callable, Optional, Tuple GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc) def _ensure_utc(dt: datetime) -> datetime: if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]: dt = _ensure_utc(timestamp) delta = dt - GPS_EPOCH total_seconds = delta.total_seconds() week = int(total_seconds // 604800) tow = total_seconds - week * 604800 return week, tow def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str: dt = _ensure_utc(timestamp) base = dt.strftime("%H%M%S") frac = dt.microsecond / 1_000_000.0 frac_str = f"{frac:.{decimals}f}".split(".")[1] return f"{base}.{frac_str}" def nmea_checksum(sentence_without_star: str) -> str: cs = 0 for ch in sentence_without_star[1:]: # 跳过 $ cs ^= ord(ch) return f"{cs:02X}" def build_gprmi_sentence( timestamp: datetime, lat_deg: float, lon_deg: float, alt_m: float, east_vel: float, north_vel: float, up_vel: float, heading_deg: float, pitch_deg: float, roll_deg: float, *, lat_std: float = 0.008, lon_std: float = 0.008, alt_std: float = 0.02, vel_std: float = 0.02, heading_std: float = 0.1, pitch_std: float = 0.1, roll_std: float = 0.1, baseline_m: float = 1.0, sat_count: int = 20, ambiguity_count: int = 18, quality: int = 4, ) -> bytes: timestamp = _ensure_utc(timestamp) utc_str = format_hhmmss(timestamp, decimals=2) week, tow = gps_week_and_tow(timestamp) fields = [ utc_str, str(week), f"{tow:.3f}", f"{lat_deg:.9f}", f"{lon_deg:.9f}", f"{alt_m:.3f}", f"{lat_std:.3f}", f"{lon_std:.3f}", f"{alt_std:.3f}", f"{east_vel:.3f}", f"{north_vel:.3f}", f"{up_vel:.3f}", f"{vel_std:.3f}", f"{heading_deg:.3f}", f"{pitch_deg:.3f}", f"{roll_deg:.3f}", f"{heading_std:.3f}", f"{pitch_std:.3f}", f"{roll_std:.3f}", f"{baseline_m:.3f}", str(int(sat_count)), str(int(ambiguity_count)), str(int(quality)), ] body = "$GPFMI," + ",".join(fields) checksum = nmea_checksum(body) sentence = f"{body}*{checksum}\r\n" return sentence.encode("ascii") def build_gpimu_sentence( timestamp: datetime, accel_g: Tuple[float, float, float], gyro_deg_s: Tuple[float, float, float], temperature_c: float, ) -> bytes: timestamp = _ensure_utc(timestamp) utc_str = format_hhmmss(timestamp, decimals=3) fields = [ utc_str, f"{accel_g[0]:+.4f}", f"{accel_g[1]:+.4f}", f"{accel_g[2]:+.4f}", f"{gyro_deg_s[0]:+.4f}", f"{gyro_deg_s[1]:+.4f}", f"{gyro_deg_s[2]:+.4f}", f"{temperature_c:.2f}", ] body = "$GPIMU," + ",".join(fields) checksum = nmea_checksum(body) sentence = f"{body}*{checksum}\r\n" return sentence.encode("ascii") def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: ratio = _clamp((value - center) / span, -1.0, 1.0) return ratio * max_speed def _clamp(val: float, min_val: float, max_val: float) -> float: return max(min_val, min(max_val, val)) def _decode_payload(payload: bytes) -> Tuple[float, float]: if len(payload) == 8: forward, turn = struct.unpack(" 0: del self._buffer[:start] if len(self._buffer) < 9: return frame_len = self._expected_frame_length() if frame_len is None or len(self._buffer) < frame_len: return frame = bytes(self._buffer[:frame_len]) del self._buffer[:frame_len] try: parsed = self._parse_frame(frame) if parsed: self._on_frame(parsed) except Exception: # 丢弃无效帧,继续搜索 continue def _find_header(self) -> int: data = bytes(self._buffer) idx = data.find(self.FRAME_HEADER) return idx def _expected_frame_length(self) -> Optional[int]: if len(self._buffer) < 5: return None payload_len = int.from_bytes(self._buffer[3:5], "little") return 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]: if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)): return None frame_type = frame[2] if frame_type != self.TYPE_CONTROL: return None payload_len = int.from_bytes(frame[3:5], "little") payload_start = 5 payload_end = payload_start + payload_len payload = frame[payload_start:payload_end] checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little") checksum_calc = sum(frame[2:payload_end]) & 0xFFFF if checksum_received != checksum_calc: raise ValueError("校验和不匹配") forward, turn = _decode_payload(payload) return PythonLinkFrame(forward=forward, turn=turn) def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]: if not line: return None line = line.strip() if not line.startswith("$"): return None checksum_str = "" data_end = len(line) star_idx = line.find("*") if star_idx != -1: data_end = star_idx checksum_str = line[star_idx + 1 : star_idx + 3] payload = line[1:data_end] calc = 0 for ch in payload: calc ^= ord(ch) if checksum_str: try: provided = int(checksum_str, 16) except ValueError: return None if provided != calc: return None parts = payload.split(",") if not parts: return None tag = parts[0] fields = parts[1:] return PythonAsciiMessage(tag=tag, fields=fields) def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]: if msg.tag.upper() != "CTRL" or len(msg.fields) < 6: return None try: forward = float(msg.fields[0]) turn = float(msg.fields[1]) freq = float(msg.fields[2]) steering = int(float(msg.fields[3])) throttle = int(float(msg.fields[4])) stage = msg.fields[5] timestamp = float(msg.fields[6]) if len(msg.fields) > 6 else 0.0 except ValueError: return None return ControlStatus( forward_mps=forward, turn_rate=turn, freq_hz=freq, steering_pwm=steering, throttle_pwm=throttle, stage=stage, timestamp_ms=timestamp, ) def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]: if msg.tag.upper() != "POSE" or len(msg.fields) < 8: return None try: east = float(msg.fields[0]) north = float(msg.fields[1]) up = float(msg.fields[2]) heading = float(msg.fields[3]) pitch = float(msg.fields[4]) roll = float(msg.fields[5]) target_e = float(msg.fields[6]) target_n = float(msg.fields[7]) timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0 except ValueError: return None return PoseStatus( east=east, north=north, up=up, heading_deg=heading, pitch_deg=pitch, roll_deg=roll, target_east=target_e, target_north=target_n, timestamp_ms=timestamp, ) def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]: if msg.tag.upper() != "STATE" or len(msg.fields) < 3: return None stage = msg.fields[0] try: xte = float(msg.fields[1]) heading_err = float(msg.fields[2]) timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0 except ValueError: return None return StateStatus( stage=stage, cross_track_error=xte, heading_error_deg=heading_err, timestamp_ms=timestamp, ) def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]: if msg.tag.upper() != "STACK" or len(msg.fields) < 3: return None task = msg.fields[0] try: stack_hw = int(float(msg.fields[1])) heap_free = int(float(msg.fields[2])) except ValueError: return None return StackStatus(task_name=task, stack_high_water=stack_hw, heap_free_bytes=heap_free)