| | |
| | | """ |
| | | 与 STM32H7 交互的协议封装: |
| | | - 构造 $GPRMI / $GPIMU 句子 |
| | | - 构造 IM23A "fmin"/"fmim" 传感器帧 |
| | | - 解析 PythonLink 控制帧 |
| | | - 解析 ASCII 状态报文 |
| | | """ |
| | | |
| | | from __future__ import annotations |
| | | |
| | | import math |
| | | import struct |
| | | import math |
| | | from dataclasses import dataclass |
| | | from datetime import datetime, timezone |
| | | from typing import Callable, Optional, Tuple |
| | | def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: |
| | | ratio = _clamp((value - center) / span, -1.0, 1.0) |
| | | return ratio * max_speed |
| | | |
| | | GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc) |
| | | |
| | | |
| | | IM23A_NAV_LEN = 100 |
| | | IM23A_IMU_LEN = 52 |
| | | IM23A_NAV_HEADER = b"fmin" |
| | | IM23A_IMU_HEADER = b"fmim" |
| | | IM23A_TAIL = b"ed" |
| | | |
| | | |
| | | def _ensure_utc(dt: datetime) -> datetime: |
| | |
| | | return dt.astimezone(timezone.utc) |
| | | |
| | | |
| | | def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]: |
| | | dt = _ensure_utc(timestamp) |
| | | delta = dt - GPS_EPOCH |
| | | total_seconds = delta.total_seconds() |
| | | week = int(total_seconds // 604800) |
| | | tow = total_seconds - week * 604800 |
| | | return week, tow |
| | | |
| | | |
| | | def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str: |
| | | dt = _ensure_utc(timestamp) |
| | | base = dt.strftime("%H%M%S") |
| | | frac = dt.microsecond / 1_000_000.0 |
| | | frac_str = f"{frac:.{decimals}f}".split(".")[1] |
| | | return f"{base}.{frac_str}" |
| | | |
| | | |
| | | def nmea_checksum(sentence_without_star: str) -> str: |
| | | cs = 0 |
| | | for ch in sentence_without_star[1:]: # 跳过 $ |
| | | for ch in sentence_without_star[1:]: |
| | | cs ^= ord(ch) |
| | | return f"{cs:02X}" |
| | | |
| | | |
| | | def build_gprmi_sentence( |
| | | def _write_double(buf: bytearray, value: float) -> None: |
| | | buf.extend(struct.pack("<d", float(value))) |
| | | |
| | | |
| | | def _write_float(buf: bytearray, value: float) -> None: |
| | | buf.extend(struct.pack("<f", float(value))) |
| | | |
| | | |
| | | def _write_u32(buf: bytearray, value: int) -> None: |
| | | buf.extend(struct.pack("<I", int(value))) |
| | | |
| | | |
| | | def _write_u16(buf: bytearray, value: int) -> None: |
| | | buf.extend(struct.pack("<H", int(value))) |
| | | |
| | | |
| | | def _append_tail(buf: bytearray) -> None: |
| | | checksum = sum(buf) & 0xFFFF |
| | | buf.extend(struct.pack("<H", checksum)) |
| | | buf.extend(IM23A_TAIL) |
| | | |
| | | |
| | | def build_im23a_nav_frame( |
| | | timestamp: datetime, |
| | | lat_deg: float, |
| | | lon_deg: float, |
| | |
| | | heading_deg: float, |
| | | pitch_deg: float, |
| | | roll_deg: float, |
| | | *, |
| | | lat_std: float = 0.008, |
| | | lon_std: float = 0.008, |
| | | alt_std: float = 0.02, |
| | | vel_std: float = 0.02, |
| | | heading_std: float = 0.1, |
| | | pitch_std: float = 0.1, |
| | | roll_std: float = 0.1, |
| | | baseline_m: float = 1.0, |
| | | sat_count: int = 20, |
| | | ambiguity_count: int = 18, |
| | | quality: int = 4, |
| | | accel_bias: Tuple[float, float, float], |
| | | gyro_bias: Tuple[float, float, float], |
| | | temperature_c: float, |
| | | status_flags: int, |
| | | ) -> bytes: |
| | | timestamp = _ensure_utc(timestamp) |
| | | utc_str = format_hhmmss(timestamp, decimals=2) |
| | | week, tow = gps_week_and_tow(timestamp) |
| | | |
| | | fields = [ |
| | | utc_str, |
| | | str(week), |
| | | f"{tow:.3f}", |
| | | f"{lat_deg:.9f}", |
| | | f"{lon_deg:.9f}", |
| | | f"{alt_m:.3f}", |
| | | f"{lat_std:.3f}", |
| | | f"{lon_std:.3f}", |
| | | f"{alt_std:.3f}", |
| | | f"{east_vel:.3f}", |
| | | f"{north_vel:.3f}", |
| | | f"{up_vel:.3f}", |
| | | f"{vel_std:.3f}", |
| | | f"{heading_deg:.3f}", |
| | | f"{pitch_deg:.3f}", |
| | | f"{roll_deg:.3f}", |
| | | f"{heading_std:.3f}", |
| | | f"{pitch_std:.3f}", |
| | | f"{roll_std:.3f}", |
| | | f"{baseline_m:.3f}", |
| | | str(int(sat_count)), |
| | | str(int(ambiguity_count)), |
| | | str(int(quality)), |
| | | ] |
| | | |
| | | body = "$GPFMI," + ",".join(fields) |
| | | checksum = nmea_checksum(body) |
| | | sentence = f"{body}*{checksum}\r\n" |
| | | return sentence.encode("ascii") |
| | | buf = bytearray(IM23A_NAV_HEADER) |
| | | _write_double(buf, _seconds_of_day(timestamp)) |
| | | _write_double(buf, lat_deg) |
| | | _write_double(buf, lon_deg) |
| | | _write_double(buf, alt_m) |
| | | _write_float(buf, north_vel) |
| | | _write_float(buf, east_vel) |
| | | _write_float(buf, -up_vel) # IM23A 向下正,MCU 内部转换回向上 |
| | | _write_float(buf, roll_deg) |
| | | _write_float(buf, pitch_deg) |
| | | _write_float(buf, heading_deg) |
| | | _write_float(buf, 0.02) # 定位精度,可根据需要调整 |
| | | _write_float(buf, accel_bias[0]) |
| | | _write_float(buf, accel_bias[1]) |
| | | _write_float(buf, accel_bias[2]) |
| | | _write_float(buf, gyro_bias[0]) |
| | | _write_float(buf, gyro_bias[1]) |
| | | _write_float(buf, gyro_bias[2]) |
| | | _write_float(buf, temperature_c) |
| | | _write_u32(buf, status_flags) |
| | | _append_tail(buf) |
| | | return bytes(buf) |
| | | |
| | | |
| | | def build_gpimu_sentence( |
| | | def build_im23a_imu_frame( |
| | | timestamp: datetime, |
| | | accel_g: Tuple[float, float, float], |
| | | gyro_deg_s: Tuple[float, float, float], |
| | | temperature_c: float, |
| | | reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0), |
| | | ) -> bytes: |
| | | timestamp = _ensure_utc(timestamp) |
| | | utc_str = format_hhmmss(timestamp, decimals=3) |
| | | |
| | | fields = [ |
| | | utc_str, |
| | | f"{accel_g[0]:+.4f}", |
| | | f"{accel_g[1]:+.4f}", |
| | | f"{accel_g[2]:+.4f}", |
| | | f"{gyro_deg_s[0]:+.4f}", |
| | | f"{gyro_deg_s[1]:+.4f}", |
| | | f"{gyro_deg_s[2]:+.4f}", |
| | | f"{temperature_c:.2f}", |
| | | ] |
| | | |
| | | body = "$GPIMU," + ",".join(fields) |
| | | checksum = nmea_checksum(body) |
| | | sentence = f"{body}*{checksum}\r\n" |
| | | return sentence.encode("ascii") |
| | | buf = bytearray(IM23A_IMU_HEADER) |
| | | _write_double(buf, _seconds_of_day(timestamp)) |
| | | _write_float(buf, accel_g[0]) |
| | | _write_float(buf, accel_g[1]) |
| | | _write_float(buf, accel_g[2]) |
| | | _write_float(buf, gyro_deg_s[0]) |
| | | _write_float(buf, gyro_deg_s[1]) |
| | | _write_float(buf, gyro_deg_s[2]) |
| | | _write_float(buf, reserves[0]) |
| | | _write_float(buf, reserves[1]) |
| | | _write_float(buf, reserves[2]) |
| | | _append_tail(buf) |
| | | return bytes(buf) |
| | | |
| | | |
| | | def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float: |
| | | ratio = _clamp((value - center) / span, -1.0, 1.0) |
| | | return ratio * max_speed |
| | | |
| | | |
| | | def _clamp(val: float, min_val: float, max_val: float) -> float: |
| | | return max(min_val, min(max_val, val)) |
| | | def _seconds_of_day(dt: datetime) -> float: |
| | | utc = _ensure_utc(dt) |
| | | return ( |
| | | utc.hour * 3600 |
| | | + utc.minute * 60 |
| | | + utc.second |
| | | + utc.microsecond / 1_000_000.0 |
| | | ) |
| | | |
| | | |
| | | def _decode_payload(payload: bytes) -> Tuple[float, float]: |