yincheng.zhong
2025-11-22 820749d41d8bc0fdfeb1f10283a2ba3b426e60f2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
与 STM32H7 交互的协议封装:
- 构造 $GPRMI / $GPIMU 句子
- 解析 PythonLink 控制帧
"""
 
from __future__ import annotations
 
import math
import struct
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
 
GPS_EPOCH = datetime(1980, 1, 6, tzinfo=timezone.utc)
 
 
def _ensure_utc(dt: datetime) -> datetime:
    if dt.tzinfo is None:
        return dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc)
 
 
def gps_week_and_tow(timestamp: datetime) -> Tuple[int, float]:
    dt = _ensure_utc(timestamp)
    delta = dt - GPS_EPOCH
    total_seconds = delta.total_seconds()
    week = int(total_seconds // 604800)
    tow = total_seconds - week * 604800
    return week, tow
 
 
def format_hhmmss(timestamp: datetime, decimals: int = 3) -> str:
    dt = _ensure_utc(timestamp)
    base = dt.strftime("%H%M%S")
    frac = dt.microsecond / 1_000_000.0
    frac_str = f"{frac:.{decimals}f}".split(".")[1]
    return f"{base}.{frac_str}"
 
 
def nmea_checksum(sentence_without_star: str) -> str:
    cs = 0
    for ch in sentence_without_star[1:]:  # 跳过 $
        cs ^= ord(ch)
    return f"{cs:02X}"
 
 
def build_gprmi_sentence(
    timestamp: datetime,
    lat_deg: float,
    lon_deg: float,
    alt_m: float,
    east_vel: float,
    north_vel: float,
    up_vel: float,
    heading_deg: float,
    pitch_deg: float,
    roll_deg: float,
    *,
    lat_std: float = 0.008,
    lon_std: float = 0.008,
    alt_std: float = 0.02,
    vel_std: float = 0.02,
    heading_std: float = 0.1,
    pitch_std: float = 0.1,
    roll_std: float = 0.1,
    baseline_m: float = 1.0,
    sat_count: int = 20,
    ambiguity_count: int = 18,
    quality: int = 4,
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=2)
    week, tow = gps_week_and_tow(timestamp)
 
    fields = [
        utc_str,
        str(week),
        f"{tow:.3f}",
        f"{lat_deg:.9f}",
        f"{lon_deg:.9f}",
        f"{alt_m:.3f}",
        f"{lat_std:.3f}",
        f"{lon_std:.3f}",
        f"{alt_std:.3f}",
        f"{east_vel:.3f}",
        f"{north_vel:.3f}",
        f"{up_vel:.3f}",
        f"{vel_std:.3f}",
        f"{heading_deg:.3f}",
        f"{pitch_deg:.3f}",
        f"{roll_deg:.3f}",
        f"{heading_std:.3f}",
        f"{pitch_std:.3f}",
        f"{roll_std:.3f}",
        f"{baseline_m:.3f}",
        str(int(sat_count)),
        str(int(ambiguity_count)),
        str(int(quality)),
    ]
 
    body = "$GPRMI," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
 
 
def build_gpimu_sentence(
    timestamp: datetime,
    accel_g: Tuple[float, float, float],
    gyro_deg_s: Tuple[float, float, float],
    temperature_c: float,
) -> bytes:
    timestamp = _ensure_utc(timestamp)
    utc_str = format_hhmmss(timestamp, decimals=3)
 
    fields = [
        utc_str,
        f"{accel_g[0]:+.4f}",
        f"{accel_g[1]:+.4f}",
        f"{accel_g[2]:+.4f}",
        f"{gyro_deg_s[0]:+.4f}",
        f"{gyro_deg_s[1]:+.4f}",
        f"{gyro_deg_s[2]:+.4f}",
        f"{temperature_c:.2f}",
    ]
 
    body = "$GPIMU," + ",".join(fields)
    checksum = nmea_checksum(body)
    sentence = f"{body}*{checksum}\r\n"
    return sentence.encode("ascii")
 
 
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    ratio = _clamp((value - center) / span, -1.0, 1.0)
    return ratio * max_speed
 
 
def _clamp(val: float, min_val: float, max_val: float) -> float:
    return max(min_val, min(max_val, val))
 
 
def _decode_payload(payload: bytes) -> Tuple[float, float]:
    if len(payload) == 8:
        forward, turn = struct.unpack("<ff", payload)
        return forward, turn
    if len(payload) == 4:
        steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
        # 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
        forward = _pwm_to_velocity(throttle_pwm)
        turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        return forward, turn
    raise ValueError(f"不支持的控制负载长度: {len(payload)}")
 
 
@dataclass
class PythonLinkFrame:
    forward: float
    turn: float
 
 
class PythonLinkDecoder:
    """
    解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
    """
 
    FRAME_HEADER = b"\xAA\x55"
    FRAME_FOOTER = b"\x0D\x0A"
    TYPE_CONTROL = 0x10
 
    def __init__(self, on_frame: Callable[[PythonLinkFrame], None]):
        self._buffer = bytearray()
        self._on_frame = on_frame
 
    def feed(self, data: bytes):
        if not data:
            return
        self._buffer.extend(data)
        while True:
            start = self._find_header()
            if start < 0:
                self._buffer.clear()
                return
            if start > 0:
                del self._buffer[:start]
            if len(self._buffer) < 9:
                return
            frame_len = self._expected_frame_length()
            if frame_len is None or len(self._buffer) < frame_len:
                return
            frame = bytes(self._buffer[:frame_len])
            del self._buffer[:frame_len]
            try:
                parsed = self._parse_frame(frame)
                if parsed:
                    self._on_frame(parsed)
            except Exception:
                # 丢弃无效帧,继续搜索
                continue
 
    def _find_header(self) -> int:
        data = bytes(self._buffer)
        idx = data.find(self.FRAME_HEADER)
        return idx
 
    def _expected_frame_length(self) -> Optional[int]:
        if len(self._buffer) < 5:
            return None
        payload_len = int.from_bytes(self._buffer[3:5], "little")
        return 2 + 1 + 2 + payload_len + 2 + 2  # header + type + len + payload + checksum + footer
 
    def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]:
        if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)):
            return None
        frame_type = frame[2]
        if frame_type != self.TYPE_CONTROL:
            return None
        payload_len = int.from_bytes(frame[3:5], "little")
        payload_start = 5
        payload_end = payload_start + payload_len
        payload = frame[payload_start:payload_end]
        checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little")
        checksum_calc = sum(frame[2:payload_end]) & 0xFFFF
        if checksum_received != checksum_calc:
            raise ValueError("校验和不匹配")
        forward, turn = _decode_payload(payload)
        return PythonLinkFrame(forward=forward, turn=turn)