yincheng.zhong
2 天以前 567085ead3f6adaabd884f16ab4b17c62e8f0403
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
"""
与 STM32H7 交互的协议封装:
- 构造 IM23A "fmin"/"fmim" 传感器帧
- 解析 PythonLink 控制帧
- 解析 ASCII 状态报文
"""
 
from __future__ import annotations
 
import struct
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
def _pwm_to_velocity(value: int, center: int = 1500, span: int = 500, max_speed: float = 1.5) -> float:
    """
    将 PWM 信号转换为线速度或角速度。
    基于 motion_config.h 中的模型参数:
    Forward: v = K_fwd * (1500 - pwm) + Bias (但模拟器这里主要是为了闭环,不需要完美匹配Bias,只要方向对)
             1000 PWM = 0.56 m/s
             1500 PWM = 0.0 m/s
             2000 PWM = -0.2 m/s (backward)
    Turn:    w = K_turn * (1500 - pwm)
             1000 PWM = 63 deg/s (Left)
             2000 PWM = -61.5 deg/s (Right)
    """
    if value < 1000: value = 1000
    if value > 2000: value = 2000
    
    # 使用参数
    # MC_CFG_FORWARD_K = 0.00196
    # MC_CFG_FORWARD_BIAS = -0.077
    # MC_CFG_STEERING_K_LEFT = 0.00206
    # MC_CFG_STEERING_K_RIGHT = 0.00201
    
    # 简单的线性映射用于模拟,忽略死区和偏置的细节,保证大致范围正确即可
    # 真实速度 = K * (1500 - pwm) + Bias
    
    delta = 1500 - value
    # 模拟器需要返回速度值,而不是设置速度。
    # 这里的 max_speed 参数不再直接作为比例系数,而是用于指示是线速度还是角速度的缩放
    
    if max_speed < 10.0: # 线速度模式 (m/s)
        # 假设是 Forward PWM
        # 1000 -> 500 diff -> 0.56 m/s => 0.00112 m/s/us
        # 2000 -> -500 diff -> -0.56 m/s (实际车可能是0.2,但模拟器可以对称)
        # 使用平均 K = 0.0015
        k_lin = 0.0015 
        if abs(delta) < 40: return 0.0 # 死区
        return delta * k_lin
    else: # 角速度模式 (deg/s -> rad/s)
        # 假设是 Steering PWM
        # 1000 -> 500 diff -> 63 deg/s => 0.126 deg/s/us
        # 2000 -> -500 diff -> -61.5 deg/s
        # K_ang ~ 0.12 deg/s/us
        k_ang_deg = 0.12
        if abs(delta) < 10: return 0.0 # 死区
        deg_s = delta * k_ang_deg
        # 转换为 rad/s ? 下面调用是 math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        # 所以这里只需要返回 deg/s,调用者会转 rad/s
        return deg_s
 
 
 
IM23A_NAV_LEN = 100
IM23A_IMU_LEN = 52
IM23A_NAV_HEADER = b"fmin"
IM23A_IMU_HEADER = b"fmim"
IM23A_TAIL = b"ed"
 
 
def _ensure_utc(dt: datetime) -> datetime:
    if dt.tzinfo is None:
        return dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc)
 
 
def nmea_checksum(sentence_without_star: str) -> str:
    cs = 0
    for ch in sentence_without_star[1:]:
        cs ^= ord(ch)
    return f"{cs:02X}"
 
 
def _write_double(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<d", float(value)))
 
 
def _write_float(buf: bytearray, value: float) -> None:
    buf.extend(struct.pack("<f", float(value)))
 
 
def _write_u32(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<I", int(value)))
 
 
def _write_u16(buf: bytearray, value: int) -> None:
    buf.extend(struct.pack("<H", int(value)))
 
 
def _append_tail(buf: bytearray) -> None:
    checksum = sum(buf) & 0xFFFF
    buf.extend(struct.pack("<H", checksum))
    buf.extend(IM23A_TAIL)
 
 
def build_im23a_nav_frame(
    timestamp: datetime,
    lat_deg: float,
    lon_deg: float,
    alt_m: float,
    east_vel: float,
    north_vel: float,
    up_vel: float,
    heading_rad: float,
    pitch_deg: float,
    roll_deg: float,
    accel_bias: Tuple[float, float, float],
    gyro_bias: Tuple[float, float, float],
    temperature_c: float,
    status_flags: int,
) -> bytes:
    buf = bytearray(IM23A_NAV_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_double(buf, lat_deg)
    _write_double(buf, lon_deg)
    _write_double(buf, alt_m)
    _write_float(buf, north_vel)
    _write_float(buf, east_vel)
    _write_float(buf, -up_vel)  # IM23A 向下正,MCU 内部转换回向上
    _write_float(buf, roll_deg)
    _write_float(buf, pitch_deg)
    _write_float(buf, heading_rad)  # 航向角:弧度值,范围-3.14到3.14,0度正北方向
    _write_float(buf, 0.02)  # 定位精度,可根据需要调整
    _write_float(buf, accel_bias[0])
    _write_float(buf, accel_bias[1])
    _write_float(buf, accel_bias[2])
    _write_float(buf, gyro_bias[0])
    _write_float(buf, gyro_bias[1])
    _write_float(buf, gyro_bias[2])
    _write_float(buf, temperature_c)
    _write_u32(buf, status_flags)
    _append_tail(buf)
    return bytes(buf)
 
 
def build_im23a_imu_frame(
    timestamp: datetime,
    accel_g: Tuple[float, float, float],
    gyro_deg_s: Tuple[float, float, float],
    reserves: Tuple[float, float, float] = (0.0, 0.0, 0.0),
) -> bytes:
    buf = bytearray(IM23A_IMU_HEADER)
    _write_double(buf, _seconds_of_day(timestamp))
    _write_float(buf, accel_g[0])
    _write_float(buf, accel_g[1])
    _write_float(buf, accel_g[2])
    _write_float(buf, gyro_deg_s[0])
    _write_float(buf, gyro_deg_s[1])
    _write_float(buf, gyro_deg_s[2])
    _write_float(buf, reserves[0])
    _write_float(buf, reserves[1])
    _write_float(buf, reserves[2])
    _append_tail(buf)
    return bytes(buf)
 
 
def _seconds_of_day(dt: datetime) -> float:
    utc = _ensure_utc(dt)
    return (
        utc.hour * 3600
        + utc.minute * 60
        + utc.second
        + utc.microsecond / 1_000_000.0
    )
 
 
def _decode_payload(payload: bytes) -> Tuple[float, float]:
    if len(payload) == 8:
        forward, turn = struct.unpack("<ff", payload)
        return forward, turn
    if len(payload) == 4:
        steering_pwm, throttle_pwm = struct.unpack("<HH", payload)
        # 重新映射:转向 PWM → yaw rate;油门 PWM → 线速度
        forward = _pwm_to_velocity(throttle_pwm, max_speed=1.0)
        turn = math.radians(_pwm_to_velocity(steering_pwm, max_speed=90.0))
        return forward, turn
    raise ValueError(f"不支持的控制负载长度: {len(payload)}")
 
 
@dataclass
class PythonLinkFrame:
    forward: float
    turn: float
 
 
@dataclass
class PythonAsciiMessage:
    tag: str
    fields: list[str]
 
 
@dataclass
class ControlStatus:
    forward_mps: float
    turn_rate: float
    freq_hz: float
    steering_pwm: int
    throttle_pwm: int
    stage: str
    timestamp_ms: float
    east: float
    north: float
    up: float
    heading_deg: float
    target_heading_deg: float
    target_east: float
    target_north: float
 
 
@dataclass
class PoseStatus:
    east: float
    north: float
    up: float
    heading_deg: float
    pitch_deg: float
    roll_deg: float
    target_east: float
    target_north: float
    timestamp_ms: float
 
 
@dataclass
class StateStatus:
    stage: str
    cross_track_error: float
    heading_error_deg: float
    timestamp_ms: float
 
 
@dataclass
class StackStatus:
    task_name: str
    stack_high_water: int
    heap_free_bytes: int
 
 
class PythonLinkDecoder:
    """
    解析 PythonLink 控制帧 (0xAA 0x55 ... 0D 0A)。
    """
 
    FRAME_HEADER = b"\xAA\x55"
    FRAME_FOOTER = b"\x0D\x0A"
    TYPE_CONTROL = 0x10
 
    def __init__(self, on_frame: Callable[[PythonLinkFrame], None]):
        self._buffer = bytearray()
        self._on_frame = on_frame
 
    def feed(self, data: bytes):
        if not data:
            return
        self._buffer.extend(data)
        while True:
            start = self._find_header()
            if start < 0:
                self._buffer.clear()
                return
            if start > 0:
                del self._buffer[:start]
            if len(self._buffer) < 9:
                return
            frame_len = self._expected_frame_length()
            if frame_len is None or len(self._buffer) < frame_len:
                return
            frame = bytes(self._buffer[:frame_len])
            del self._buffer[:frame_len]
            try:
                parsed = self._parse_frame(frame)
                if parsed:
                    self._on_frame(parsed)
            except Exception:
                # 丢弃无效帧,继续搜索
                continue
 
    def _find_header(self) -> int:
        data = bytes(self._buffer)
        idx = data.find(self.FRAME_HEADER)
        return idx
 
    def _expected_frame_length(self) -> Optional[int]:
        if len(self._buffer) < 5:
            return None
        payload_len = int.from_bytes(self._buffer[3:5], "little")
        return 2 + 1 + 2 + payload_len + 2 + 2  # header + type + len + payload + checksum + footer
 
    def _parse_frame(self, frame: bytes) -> Optional[PythonLinkFrame]:
        if not (frame.startswith(self.FRAME_HEADER) and frame.endswith(self.FRAME_FOOTER)):
            return None
        frame_type = frame[2]
        if frame_type != self.TYPE_CONTROL:
            return None
        payload_len = int.from_bytes(frame[3:5], "little")
        payload_start = 5
        payload_end = payload_start + payload_len
        payload = frame[payload_start:payload_end]
        checksum_received = int.from_bytes(frame[payload_end:payload_end + 2], "little")
        checksum_calc = sum(frame[2:payload_end]) & 0xFFFF
        if checksum_received != checksum_calc:
            raise ValueError("校验和不匹配")
        forward, turn = _decode_payload(payload)
        return PythonLinkFrame(forward=forward, turn=turn)
 
 
def parse_ascii_message(line: str) -> Optional[PythonAsciiMessage]:
    if not line:
        return None
    line = line.strip()
    if not line.startswith("$"):
        return None
    checksum_str = ""
    data_end = len(line)
    star_idx = line.find("*")
    if star_idx != -1:
        data_end = star_idx
        checksum_str = line[star_idx + 1 : star_idx + 3]
    payload = line[1:data_end]
    calc = 0
    for ch in payload:
        calc ^= ord(ch)
    if checksum_str:
        try:
            provided = int(checksum_str, 16)
        except ValueError:
            return None
        if provided != calc:
            return None
    parts = payload.split(",")
    if not parts:
        return None
    tag = parts[0]
    fields = parts[1:]
    return PythonAsciiMessage(tag=tag, fields=fields)
 
 
def decode_control_status(msg: PythonAsciiMessage) -> Optional[ControlStatus]:
    if msg.tag.upper() != "CTRL" or len(msg.fields) < 14:
        return None
    try:
        forward = float(msg.fields[0])
        turn = float(msg.fields[1])
        freq = float(msg.fields[2])
        steering = int(float(msg.fields[3]))
        throttle = int(float(msg.fields[4]))
        stage = msg.fields[5]
        timestamp = float(msg.fields[6])
        east = float(msg.fields[7])
        north = float(msg.fields[8])
        up = float(msg.fields[9])
        heading = float(msg.fields[10])
        target_heading = float(msg.fields[11])
        target_e = float(msg.fields[12])
        target_n = float(msg.fields[13])
    except ValueError:
        return None
    return ControlStatus(
        forward_mps=forward,
        turn_rate=turn,
        freq_hz=freq,
        steering_pwm=steering,
        throttle_pwm=throttle,
        stage=stage,
        timestamp_ms=timestamp,
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        target_heading_deg=target_heading,
        target_east=target_e,
        target_north=target_n,
    )
 
 
def decode_pose_status(msg: PythonAsciiMessage) -> Optional[PoseStatus]:
    if msg.tag.upper() != "POSE" or len(msg.fields) < 8:
        return None
    try:
        east = float(msg.fields[0])
        north = float(msg.fields[1])
        up = float(msg.fields[2])
        heading = float(msg.fields[3])
        pitch = float(msg.fields[4])
        roll = float(msg.fields[5])
        target_e = float(msg.fields[6])
        target_n = float(msg.fields[7])
        timestamp = float(msg.fields[8]) if len(msg.fields) > 8 else 0.0
    except ValueError:
        return None
    return PoseStatus(
        east=east,
        north=north,
        up=up,
        heading_deg=heading,
        pitch_deg=pitch,
        roll_deg=roll,
        target_east=target_e,
        target_north=target_n,
        timestamp_ms=timestamp,
    )
 
 
def decode_state_status(msg: PythonAsciiMessage) -> Optional[StateStatus]:
    if msg.tag.upper() != "STATE" or len(msg.fields) < 3:
        return None
    stage = msg.fields[0]
    try:
        xte = float(msg.fields[1])
        heading_err = float(msg.fields[2])
        timestamp = float(msg.fields[3]) if len(msg.fields) > 3 else 0.0
    except ValueError:
        return None
    return StateStatus(
        stage=stage,
        cross_track_error=xte,
        heading_error_deg=heading_err,
        timestamp_ms=timestamp,
    )
 
 
def decode_stack_status(msg: PythonAsciiMessage) -> Optional[StackStatus]:
    if msg.tag.upper() != "STACK" or len(msg.fields) < 3:
        return None
    task = msg.fields[0]
    try:
        stack_hw = int(float(msg.fields[1]))
        heap_free = int(float(msg.fields[2]))
    except ValueError:
        return None
    return StackStatus(task_name=task,
                       stack_high_water=stack_hw,
                       heap_free_bytes=heap_free)