yincheng.zhong
2 天以前 567085ead3f6adaabd884f16ab4b17c62e8f0403
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import struct
import time
 
import pytest
 
from hitl import simulator as simulator_mod
 
GGA_SAMPLE = "$GNGGA,080112.000,3949.8105069,N,11616.6876082,E,4,44,0.42,48.502,M,-6.684,M,1.0,0409*73"
 
 
def _build_control_frame(payload: bytes) -> bytes:
    header = b"\xAA\x55"
    frame_type = b"\x10"
    length = struct.pack("<H", len(payload))
    checksum = (sum(frame_type + length + payload)) & 0xFFFF
    footer = b"\x0D\x0A"
    return header + frame_type + length + payload + struct.pack("<H", checksum) + footer
 
 
class FakeSerialEndpoint:
    def __init__(self, port: str | None, baudrate: int, timeout: float = 0.0):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.opened = False
        self.writes: list[bytes] = []
        self._read_buffer = bytearray()
 
    def open(self):
        if self.port is None:
            return
        self.opened = True
 
    def close(self):
        self.opened = False
 
    def write(self, data: bytes):
        if not data:
            return
        self.writes.append(bytes(data))
 
    def read(self, size: int = 1) -> bytes:
        if not self._read_buffer:
            return b""
        size = min(size, len(self._read_buffer))
        chunk = self._read_buffer[:size]
        del self._read_buffer[:size]
        return bytes(chunk)
 
    def readline(self) -> bytes:
        return b""
 
    def inject(self, data: bytes):
        self._read_buffer.extend(data)
 
 
def test_simulator_emits_sensor_frames(monkeypatch):
    monkeypatch.setattr(simulator_mod, "SerialEndpoint", FakeSerialEndpoint)
 
    config = simulator_mod.HitlConfig(
        uart2_port="SIM",
        uart5_port=None,
        origin_gga=GGA_SAMPLE,
        initial_enu=(0.0, 0.0, 0.0),
    )
    sim = simulator_mod.HitlSimulator(config)
    received_controls: list[tuple[float, float]] = []
    sim.on_control = lambda f, t: received_controls.append((f, t))
 
    sim.start()
    time.sleep(0.25)
 
    frame = _build_control_frame(struct.pack("<ff", 0.5, 0.2))
    sim.uart2.inject(frame)
    time.sleep(0.05)
    sim.stop()
 
    nav_frames = [pkt for pkt in sim.uart2.writes if pkt.startswith(b"fmin")]
    imu_frames = [pkt for pkt in sim.uart2.writes if pkt.startswith(b"fmim")]
 
    assert all(len(pkt) == 100 for pkt in nav_frames)
    assert all(len(pkt) == 52 for pkt in imu_frames)
 
    assert len(nav_frames) >= 2
    assert len(imu_frames) >= 10
    assert received_controls, "控制帧未被解析"
    assert received_controls[-1][0] == pytest.approx(0.5, rel=0.1)
    assert received_controls[-1][1] == pytest.approx(0.2, rel=0.1)