| | |
| | | """ |
| | | 与 STM32H7 交互的协议封装: |
| | | - 构造 $GPRMI / $GPIMU 句子 |
| | | - 构造 IM23A "fmin"/"fmim" 传感器帧 |
| | | - 解析 PythonLink 控制帧 |
| | | - 解析 ASCII 状态报文 |
| | | """ |
| | | |
| | | from __future__ import annotations |
| | | |
| | | import math |
| | | import struct |
| | | import math |
| | | from dataclasses import dataclass |
| | | from datetime import datetime, timezone |
| | | from typing import Callable, Optional, Tuple |
| | | def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: |
| | | """ |
| | | 将 PWM 信号转换为线速度或角速度。 |
| | | 基于 motion_config.h 中的模型参数: |
| | | Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对) |
| | | 1000 PWM = 0.56 m/s |
| | | 1500 PWM = 0.0 m/s |
| | | 2000 PWM = -0.2 m/s (backward) |
| | | Turn: w = K_turn * (1500 - pwm) |
| | | 1000 PWM = 63 deg/s (Left) |
| | | 2000 PWM = -61.5 deg/s (Right) |
| | | """ |
| | | if value < 1000: value = 1000 |
| | | if value > 2000: value = 2000 |
| | | |
| | | # 使用参数 |
| | | # MC_CFG_FORWARD_K = 0.00196 |
| | | # MC_CFG_FORWARD_BIAS = -0.077 |
| | | # MC_CFG_STEERING_K_LEFT = 0.00206 |
| | | # MC_CFG_STEERING_K_RIGHT = 0.00201 |
| | | |
| | | # 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可 |
| | | # 真实速度 = K * (1500 - pwm) + Bias |
| | | |
| | | delta = 1500 - value |
| | | # 模拟器需要返回速度值,而不是设置速度。 |
| | | # 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放 |
| | | |
| | | if max_speed < 10.0: # 线速度模式 (m/s) |
| | | # 假设是 Forward PWM |
| | | # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us |
| | | # 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称) |
| | | # 使用平均 K = 0.0015 |
| | | k_lin = 0.0015 |
| | | if abs(delta) < 40: return 0.0 # 死区 |
| | | return delta * k_lin |
| | | else: # 角速度模式 (deg/s -> rad/s) |
| | | # 假设是 Steering PWM |
| | | # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us |
| | | # 2000 -> -500 diff -> -61.5 deg/s |
| | | # K_ang ~ 0.12 deg/s/us |
| | | k_ang_deg = 0.12 |
| | | if abs(delta) < 10: return 0.0 # 死区 |
| | | deg_s = delta * k_ang_deg |
| | | # 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0)) |
| | | # 所以这里只需要返回 deg/s,调用者会转 rad/s |
| | | return deg_s |
| | | |
| | | GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc) |
| | | |
| | | |
| | | IM23A_NAV_LEN = 100 |
| | | IM23A_IMU_LEN = 52 |
| | | IM23A_NAV_HEADER = b"fmin" |
| | | IM23A_IMU_HEADER = b"fmim" |
| | | IM23A_TAIL = b"ed" |
| | | |
| | | |
| | | def _ensure_utc(dt: datetime) -> datetime: |
| | |
| | | return dt.astimezone(timezone.utc) |
| | | |
| | | |
| | | def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]: |
| | | dt = _ensure_utc(timestamp) |
| | | delta = dt - GPS_EPOCH |
| | | total_seconds = delta.total_seconds() |
| | | week = int(total_seconds // 604800) |
| | | tow = total_seconds - week * 604800 |
| | | return week, tow |
| | | |
| | | |
| | | def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str: |
| | | dt = _ensure_utc(timestamp) |
| | | base = dt.strftime("%H%M%S") |
| | | frac = dt.microsecond / 1_000_000.0 |
| | | frac_str = f"{frac:.{decimals}f}".split(".")[1] |
| | | return f"{base}.{frac_str}" |
| | | |
| | | |
| | | def nmea_checksum(sentence_without_star: str) -> str: |
| | | cs = 0 |
| | | for ch in sentence_without_star[1:]: # 跳过 $ |
| | | for ch in sentence_without_star[1:]: |
| | | cs ^= ord(ch) |
| | | return f"{cs:02X}" |
| | | |
| | | |
| | | def build_gprmi_sentence( |
| | | def _write_double(buf: bytearray, value: float) -> None: |
| | | buf.extend(struct.pack("<d", float(value))) |
| | | |
| | | |
| | | def _write_float(buf: bytearray, value: float) -> None: |
| | | buf.extend(struct.pack("<f", float(value))) |
| | | |
| | | |
| | | def _write_u32(buf: bytearray, value: int) -> None: |
| | | buf.extend(struct.pack("<I", int(value))) |
| | | |
| | | |
| | | def _write_u16(buf: bytearray, value: int) -> None: |
| | | buf.extend(struct.pack("<H", int(value))) |
| | | |
| | | |
| | | def _append_tail(buf: bytearray) -> None: |
| | | checksum = sum(buf) & 0xFFFF |
| | | buf.extend(struct.pack("<H", checksum)) |
| | | buf.extend(IM23A_TAIL) |
| | | |
| | | |
| | | def build_im23a_nav_frame( |
| | | timestamp: datetime, |
| | | lat_deg: float, |
| | | lon_deg: float, |
| | |
| | | east_vel: float, |
| | | north_vel: float, |
| | | up_vel: float, |
| | | heading_deg: float, |
| | | heading_rad: float, |
| | | pitch_deg: float, |
| | | roll_deg: float, |
| | | *, |
| | | lat_std: float = 0.008, |
| | | lon_std: float = 0.008, |
| | | alt_std: float = 0.02, |
| | | vel_std: float = 0.02, |
| | | heading_std: float = 0.1, |
| | | pitch_std: float = 0.1, |
| | | roll_std: float = 0.1, |
| | | baseline_m: float = 1.0, |
| | | sat_count: int = 20, |
| | | ambiguity_count: int = 18, |
| | | quality: int = 4, |
| | | accel_bias: Tuple[float, float, float], |
| | | gyro_bias: Tuple[float, float, float], |
| | | temperature_c: float, |
| | | status_flags: int, |
| | | ) -> bytes: |
| | | timestamp = _ensure_utc(timestamp) |
| | | utc_str = format_hhmmss(timestamp, decimals=2) |
| | | week, tow = gps_week_and_tow(timestamp) |
| | | |
| | | fields = [ |
| | | utc_str, |
| | | str(week), |
| | | f"{tow:.3f}", |
| | | f"{lat_deg:.9f}", |
| | | f"{lon_deg:.9f}", |
| | | f"{alt_m:.3f}", |
| | | f"{lat_std:.3f}", |
| | | f"{lon_std:.3f}", |
| | | f"{alt_std:.3f}", |
| | | f"{east_vel:.3f}", |
| | | f"{north_vel:.3f}", |
| | | f"{up_vel:.3f}", |
| | | f"{vel_std:.3f}", |
| | | f"{heading_deg:.3f}", |
| | | f"{pitch_deg:.3f}", |
| | | f"{roll_deg:.3f}", |
| | | f"{heading_std:.3f}", |
| | | f"{pitch_std:.3f}", |
| | | f"{roll_std:.3f}", |
| | | f"{baseline_m:.3f}", |
| | | str(int(sat_count)), |
| | | str(int(ambiguity_count)), |
| | | str(int(quality)), |
| | | ] |
| | | |
| | | body = "$GPRMI," + ",".join(fields) |
| | | checksum = nmea_checksum(body) |
| | | sentence = f"{body}*{checksum}\r\n" |
| | | return sentence.encode("ascii") |
| | | buf = bytearray(IM23A_NAV_HEADER) |
| | | _write_double(buf, _seconds_of_day(timestamp)) |
| | | _write_double(buf, lat_deg) |
| | | _write_double(buf, lon_deg) |
| | | _write_double(buf, alt_m) |
| | | _write_float(buf, north_vel) |
| | | _write_float(buf, east_vel) |
| | | _write_float(buf, -up_vel) # IM23A 向下正,MCU 内部转换回向上 |
| | | _write_float(buf, roll_deg) |
| | | _write_float(buf, pitch_deg) |
| | | _write_float(buf, heading_rad) # 航向角:弧度值,范围-3.14到3.14,0度正北方向 |
| | | _write_float(buf, 0.02) # 定位精度,可根据需要调整 |
| | | _write_float(buf, accel_bias[0]) |
| | | _write_float(buf, accel_bias[1]) |
| | | _write_float(buf, accel_bias[2]) |
| | | _write_float(buf, gyro_bias[0]) |
| | | _write_float(buf, gyro_bias[1]) |
| | | _write_float(buf, gyro_bias[2]) |
| | | _write_float(buf, temperature_c) |
| | | _write_u32(buf, status_flags) |
| | | _append_tail(buf) |
| | | return bytes(buf) |
| | | |
| | | |
| | | def build_gpimu_sentence( |
| | | def build_im23a_imu_frame( |
| | | timestamp: datetime, |
| | | accel_g: Tuple[float, float, float], |
| | | gyro_deg_s: Tuple[float, float, float], |
| | | temperature_c: float, |
| | | reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0), |
| | | ) -> bytes: |
| | | timestamp = _ensure_utc(timestamp) |
| | | utc_str = format_hhmmss(timestamp, decimals=3) |
| | | |
| | | fields = [ |
| | | utc_str, |
| | | f"{accel_g[0]:+.4f}", |
| | | f"{accel_g[1]:+.4f}", |
| | | f"{accel_g[2]:+.4f}", |
| | | f"{gyro_deg_s[0]:+.4f}", |
| | | f"{gyro_deg_s[1]:+.4f}", |
| | | f"{gyro_deg_s[2]:+.4f}", |
| | | f"{temperature_c:.2f}", |
| | | ] |
| | | |
| | | body = "$GPIMU," + ",".join(fields) |
| | | checksum = nmea_checksum(body) |
| | | sentence = f"{body}*{checksum}\r\n" |
| | | return sentence.encode("ascii") |
| | | buf = bytearray(IM23A_IMU_HEADER) |
| | | _write_double(buf, _seconds_of_day(timestamp)) |
| | | _write_float(buf, accel_g[0]) |
| | | _write_float(buf, accel_g[1]) |
| | | _write_float(buf, accel_g[2]) |
| | | _write_float(buf, gyro_deg_s[0]) |
| | | _write_float(buf, gyro_deg_s[1]) |
| | | _write_float(buf, gyro_deg_s[2]) |
| | | _write_float(buf, reserves[0]) |
| | | _write_float(buf, reserves[1]) |
| | | _write_float(buf, reserves[2]) |
| | | _append_tail(buf) |
| | | return bytes(buf) |
| | | |
| | | |
| | | def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: |
| | | ratio = _clamp((value - center) / span, -1.0, 1.0) |
| | | return ratio * max_speed |
| | | |
| | | |
| | | def _clamp(val: float, min_val: float, max_val: float) -> float: |
| | | return max(min_val, min(max_val, val)) |
| | | def _seconds_of_day(dt: datetime) -> float: |
| | | utc = _ensure_utc(dt) |
| | | return ( |
| | | utc.hour * 3600 |
| | | + utc.minute * 60 |
| | | + utc.second |
| | | + utc.microsecond / 1_000_000.0 |
| | | ) |
| | | |
| | | |
| | | def _decode_payload(payload: bytes) -> Tuple[float, float]: |
| | |
| | | if len(payload) == 4: |
| | | steering_pwm, throttle_pwm = struct.unpack("<HH", payload) |
| | | # 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度 |
| | | forward = _pwm_to_velocity(throttle_pwm) |
| | | forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0) |
| | | turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0)) |
| | | return forward, turn |
| | | raise ValueError(f"不支持的控制负载长度: {len(payload)}") |
| | |
| | | turn: float |
| | | |
| | | |
| | | @dataclass |
| | | class PythonAsciiMessage: |
| | | tag: str |
| | | fields: list[str] |
| | | |
| | | |
| | | @dataclass |
| | | class ControlStatus: |
| | | forward_mps: float |
| | | turn_rate: float |
| | | freq_hz: float |
| | | steering_pwm: int |
| | | throttle_pwm: int |
| | | stage: str |
| | | timestamp_ms: float |
| | | east: float |
| | | north: float |
| | | up: float |
| | | heading_deg: float |
| | | target_heading_deg: float |
| | | target_east: float |
| | | target_north: float |
| | | |
| | | |
| | | @dataclass |
| | | class PoseStatus: |
| | | east: float |
| | | north: float |
| | | up: float |
| | | heading_deg: float |
| | | pitch_deg: float |
| | | roll_deg: float |
| | | target_east: float |
| | | target_north: float |
| | | timestamp_ms: float |
| | | |
| | | |
| | | @dataclass |
| | | class StateStatus: |
| | | stage: str |
| | | cross_track_error: float |
| | | heading_error_deg: float |
| | | timestamp_ms: float |
| | | |
| | | |
| | | @dataclass |
| | | class StackStatus: |
| | | task_name: str |
| | | stack_high_water: int |
| | | heap_free_bytes: int |
| | | |
| | | |
| | | class PythonLinkDecoder: |
| | | """ |
| | | 解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。 |
| | |
| | | return PythonLinkFrame(forward=forward, turn=turn) |
| | | |
| | | |
| | | def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]: |
| | | if not line: |
| | | return None |
| | | line = line.strip() |
| | | if not line.startswith("$"): |
| | | return None |
| | | checksum_str = "" |
| | | data_end = len(line) |
| | | star_idx = line.find("*") |
| | | if star_idx != -1: |
| | | data_end = star_idx |
| | | checksum_str = line[star_idx + 1 : star_idx + 3] |
| | | payload = line[1:data_end] |
| | | calc = 0 |
| | | for ch in payload: |
| | | calc ^= ord(ch) |
| | | if checksum_str: |
| | | try: |
| | | provided = int(checksum_str, 16) |
| | | except ValueError: |
| | | return None |
| | | if provided != calc: |
| | | return None |
| | | parts = payload.split(",") |
| | | if not parts: |
| | | return None |
| | | tag = parts[0] |
| | | fields = parts[1:] |
| | | return PythonAsciiMessage(tag=tag, fields=fields) |
| | | |
| | | |
| | | def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]: |
| | | if msg.tag.upper() != "CTRL" or len(msg.fields) < 14: |
| | | return None |
| | | try: |
| | | forward = float(msg.fields[0]) |
| | | turn = float(msg.fields[1]) |
| | | freq = float(msg.fields[2]) |
| | | steering = int(float(msg.fields[3])) |
| | | throttle = int(float(msg.fields[4])) |
| | | stage = msg.fields[5] |
| | | timestamp = float(msg.fields[6]) |
| | | east = float(msg.fields[7]) |
| | | north = float(msg.fields[8]) |
| | | up = float(msg.fields[9]) |
| | | heading = float(msg.fields[10]) |
| | | target_heading = float(msg.fields[11]) |
| | | target_e = float(msg.fields[12]) |
| | | target_n = float(msg.fields[13]) |
| | | except ValueError: |
| | | return None |
| | | return ControlStatus( |
| | | forward_mps=forward, |
| | | turn_rate=turn, |
| | | freq_hz=freq, |
| | | steering_pwm=steering, |
| | | throttle_pwm=throttle, |
| | | stage=stage, |
| | | timestamp_ms=timestamp, |
| | | east=east, |
| | | north=north, |
| | | up=up, |
| | | heading_deg=heading, |
| | | target_heading_deg=target_heading, |
| | | target_east=target_e, |
| | | target_north=target_n, |
| | | ) |
| | | |
| | | |
| | | def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]: |
| | | if msg.tag.upper() != "POSE" or len(msg.fields) < 8: |
| | | return None |
| | | try: |
| | | east = float(msg.fields[0]) |
| | | north = float(msg.fields[1]) |
| | | up = float(msg.fields[2]) |
| | | heading = float(msg.fields[3]) |
| | | pitch = float(msg.fields[4]) |
| | | roll = float(msg.fields[5]) |
| | | target_e = float(msg.fields[6]) |
| | | target_n = float(msg.fields[7]) |
| | | timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0 |
| | | except ValueError: |
| | | return None |
| | | return PoseStatus( |
| | | east=east, |
| | | north=north, |
| | | up=up, |
| | | heading_deg=heading, |
| | | pitch_deg=pitch, |
| | | roll_deg=roll, |
| | | target_east=target_e, |
| | | target_north=target_n, |
| | | timestamp_ms=timestamp, |
| | | ) |
| | | |
| | | |
| | | def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]: |
| | | if msg.tag.upper() != "STATE" or len(msg.fields) < 3: |
| | | return None |
| | | stage = msg.fields[0] |
| | | try: |
| | | xte = float(msg.fields[1]) |
| | | heading_err = float(msg.fields[2]) |
| | | timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0 |
| | | except ValueError: |
| | | return None |
| | | return StateStatus( |
| | | stage=stage, |
| | | cross_track_error=xte, |
| | | heading_error_deg=heading_err, |
| | | timestamp_ms=timestamp, |
| | | ) |
| | | |
| | | |
| | | def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]: |
| | | if msg.tag.upper() != "STACK" or len(msg.fields) < 3: |
| | | return None |
| | | task = msg.fields[0] |
| | | try: |
| | | stack_hw = int(float(msg.fields[1])) |
| | | heap_free = int(float(msg.fields[2])) |
| | | except ValueError: |
| | | return None |
| | | return StackStatus(task_name=task, |
| | | stack_high_water=stack_hw, |
| | | heap_free_bytes=heap_free) |
| | | |
| | | |