""" 与 STM32H7 交互的协议封装: - 构造 IM23A "fmin"/"fmim" 传感器帧 - 解析 PythonLink 控制帧 - 解析 ASCII 状态报文 """ from __future__ import annotations import struct import math from dataclasses import dataclass from datetime import datetime, timezone from typing import Callable, Optional, Tuple def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: """ 将 PWM 信号转换为线速度或角速度。 基于 motion_config.h 中的模型参数: Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对) 1000 PWM = 0.56 m/s 1500 PWM = 0.0 m/s 2000 PWM = -0.2 m/s (backward) Turn: w = K_turn * (1500 - pwm) 1000 PWM = 63 deg/s (Left) 2000 PWM = -61.5 deg/s (Right) """ if value < 1000: value = 1000 if value > 2000: value = 2000 # 使用参数 # MC_CFG_FORWARD_K = 0.00196 # MC_CFG_FORWARD_BIAS = -0.077 # MC_CFG_STEERING_K_LEFT = 0.00206 # MC_CFG_STEERING_K_RIGHT = 0.00201 # 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可 # 真实速度 = K * (1500 - pwm) + Bias delta = 1500 - value # 模拟器需要返回速度值,而不是设置速度。 # 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放 if max_speed < 10.0: # 线速度模式 (m/s) # 假设是 Forward PWM # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us # 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称) # 使用平均 K = 0.0015 k_lin = 0.0015 if abs(delta) < 40: return 0.0 # 死区 return delta * k_lin else: # 角速度模式 (deg/s -> rad/s) # 假设是 Steering PWM # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us # 2000 -> -500 diff -> -61.5 deg/s # K_ang ~ 0.12 deg/s/us k_ang_deg = 0.12 if abs(delta) < 10: return 0.0 # 死区 deg_s = delta * k_ang_deg # 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0)) # 所以这里只需要返回 deg/s,调用者会转 rad/s return deg_s IM23A_NAV_LEN = 100 IM23A_IMU_LEN = 52 IM23A_NAV_HEADER = b"fmin" IM23A_IMU_HEADER = b"fmim" IM23A_TAIL = b"ed" def _ensure_utc(dt: datetime) -> datetime: if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def nmea_checksum(sentence_without_star: str) -> str: cs = 0 for ch in sentence_without_star[1:]: cs ^= ord(ch) return f"{cs:02X}" def _write_double(buf: bytearray, value: float) -> None: buf.extend(struct.pack(" None: buf.extend(struct.pack(" None: buf.extend(struct.pack(" None: buf.extend(struct.pack(" None: checksum = sum(buf) & 0xFFFF buf.extend(struct.pack(" bytes: buf = bytearray(IM23A_NAV_HEADER) _write_double(buf, _seconds_of_day(timestamp)) _write_double(buf, lat_deg) _write_double(buf, lon_deg) _write_double(buf, alt_m) _write_float(buf, north_vel) _write_float(buf, east_vel) _write_float(buf, -up_vel) # IM23A 向下正,MCU 内部转换回向上 _write_float(buf, roll_deg) _write_float(buf, pitch_deg) _write_float(buf, heading_rad) # 航向角:弧度值,范围-3.14到3.14,0度正北方向 _write_float(buf, 0.02) # 定位精度,可根据需要调整 _write_float(buf, accel_bias[0]) _write_float(buf, accel_bias[1]) _write_float(buf, accel_bias[2]) _write_float(buf, gyro_bias[0]) _write_float(buf, gyro_bias[1]) _write_float(buf, gyro_bias[2]) _write_float(buf, temperature_c) _write_u32(buf, status_flags) _append_tail(buf) return bytes(buf) def build_im23a_imu_frame( timestamp: datetime, accel_g: Tuple[float, float, float], gyro_deg_s: Tuple[float, float, float], reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0), ) -> bytes: buf = bytearray(IM23A_IMU_HEADER) _write_double(buf, _seconds_of_day(timestamp)) _write_float(buf, accel_g[0]) _write_float(buf, accel_g[1]) _write_float(buf, accel_g[2]) _write_float(buf, gyro_deg_s[0]) _write_float(buf, gyro_deg_s[1]) _write_float(buf, gyro_deg_s[2]) _write_float(buf, reserves[0]) _write_float(buf, reserves[1]) _write_float(buf, reserves[2]) _append_tail(buf) return bytes(buf) def _seconds_of_day(dt: datetime) -> float: utc = _ensure_utc(dt) return ( utc.hour * 3600 + utc.minute * 60 + utc.second + utc.microsecond / 1_000_000.0 ) def _decode_payload(payload: bytes) -> Tuple[float, float]: if len(payload) == 8: forward, turn = struct.unpack(" 0: del self._buffer[:start] if len(self._buffer) < 9: return frame_len = self._expected_frame_length() if frame_len is None or len(self._buffer) < frame_len: return frame = bytes(self._buffer[:frame_len]) del self._buffer[:frame_len] try: parsed = self._parse_frame(frame) if parsed: self._on_frame(parsed) except Exception: # 丢弃无效帧,继续搜索 continue def _find_header(self) -> int: data = bytes(self._buffer) idx = data.find(self.FRAME_HEADER) return idx def _expected_frame_length(self) -> Optional[int]: if len(self._buffer) < 5: return None payload_len = int.from_bytes(self._buffer[3:5], "little") return 2 + 1 + 2 + payload_len + 2 + 2 # header + type + len + payload + checksum + footer def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]: if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)): return None frame_type = frame[2] if frame_type != self.TYPE_CONTROL: return None payload_len = int.from_bytes(frame[3:5], "little") payload_start = 5 payload_end = payload_start + payload_len payload = frame[payload_start:payload_end] checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little") checksum_calc = sum(frame[2:payload_end]) & 0xFFFF if checksum_received != checksum_calc: raise ValueError("校验和不匹配") forward, turn = _decode_payload(payload) return PythonLinkFrame(forward=forward, turn=turn) def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]: if not line: return None line = line.strip() if not line.startswith("$"): return None checksum_str = "" data_end = len(line) star_idx = line.find("*") if star_idx != -1: data_end = star_idx checksum_str = line[star_idx + 1 : star_idx + 3] payload = line[1:data_end] calc = 0 for ch in payload: calc ^= ord(ch) if checksum_str: try: provided = int(checksum_str, 16) except ValueError: return None if provided != calc: return None parts = payload.split(",") if not parts: return None tag = parts[0] fields = parts[1:] return PythonAsciiMessage(tag=tag, fields=fields) def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]: if msg.tag.upper() != "CTRL" or len(msg.fields) < 14: return None try: forward = float(msg.fields[0]) turn = float(msg.fields[1]) freq = float(msg.fields[2]) steering = int(float(msg.fields[3])) throttle = int(float(msg.fields[4])) stage = msg.fields[5] timestamp = float(msg.fields[6]) east = float(msg.fields[7]) north = float(msg.fields[8]) up = float(msg.fields[9]) heading = float(msg.fields[10]) target_heading = float(msg.fields[11]) target_e = float(msg.fields[12]) target_n = float(msg.fields[13]) except ValueError: return None return ControlStatus( forward_mps=forward, turn_rate=turn, freq_hz=freq, steering_pwm=steering, throttle_pwm=throttle, stage=stage, timestamp_ms=timestamp, east=east, north=north, up=up, heading_deg=heading, target_heading_deg=target_heading, target_east=target_e, target_north=target_n, ) def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]: if msg.tag.upper() != "POSE" or len(msg.fields) < 8: return None try: east = float(msg.fields[0]) north = float(msg.fields[1]) up = float(msg.fields[2]) heading = float(msg.fields[3]) pitch = float(msg.fields[4]) roll = float(msg.fields[5]) target_e = float(msg.fields[6]) target_n = float(msg.fields[7]) timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0 except ValueError: return None return PoseStatus( east=east, north=north, up=up, heading_deg=heading, pitch_deg=pitch, roll_deg=roll, target_east=target_e, target_north=target_n, timestamp_ms=timestamp, ) def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]: if msg.tag.upper() != "STATE" or len(msg.fields) < 3: return None stage = msg.fields[0] try: xte = float(msg.fields[1]) heading_err = float(msg.fields[2]) timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0 except ValueError: return None return StateStatus( stage=stage, cross_track_error=xte, heading_error_deg=heading_err, timestamp_ms=timestamp, ) def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]: if msg.tag.upper() != "STACK" or len(msg.fields) < 3: return None task = msg.fields[0] try: stack_hw = int(float(msg.fields[1])) heap_free = int(float(msg.fields[2])) except ValueError: return None return StackStatus(task_name=task, stack_high_water=stack_hw, heap_free_bytes=heap_free)