import struct
|
import time
|
|
import pytest
|
|
from hitl import simulator as simulator_mod
|
|
GGA_SAMPLE = "$GNGGA,080112.000,3949.8105069,N,11616.6876082,E,4,44,0.42,48.502,M,-6.684,M,1.0,0409*73"
|
|
|
def _build_control_frame(payload: bytes) -> bytes:
|
header = b"\xAA\x55"
|
frame_type = b"\x10"
|
length = struct.pack("<H", len(payload))
|
checksum = (sum(frame_type + length + payload)) & 0xFFFF
|
footer = b"\x0D\x0A"
|
return header + frame_type + length + payload + struct.pack("<H", checksum) + footer
|
|
|
class FakeSerialEndpoint:
|
def __init__(self, port: str | None, baudrate: int, timeout: float = 0.0):
|
self.port = port
|
self.baudrate = baudrate
|
self.timeout = timeout
|
self.opened = False
|
self.writes: list[bytes] = []
|
self._read_buffer = bytearray()
|
|
def open(self):
|
if self.port is None:
|
return
|
self.opened = True
|
|
def close(self):
|
self.opened = False
|
|
def write(self, data: bytes):
|
if not data:
|
return
|
self.writes.append(bytes(data))
|
|
def read(self, size: int = 1) -> bytes:
|
if not self._read_buffer:
|
return b""
|
size = min(size, len(self._read_buffer))
|
chunk = self._read_buffer[:size]
|
del self._read_buffer[:size]
|
return bytes(chunk)
|
|
def readline(self) -> bytes:
|
return b""
|
|
def inject(self, data: bytes):
|
self._read_buffer.extend(data)
|
|
|
def test_simulator_emits_sensor_frames(monkeypatch):
|
monkeypatch.setattr(simulator_mod, "SerialEndpoint", FakeSerialEndpoint)
|
|
config = simulator_mod.HitlConfig(
|
uart2_port="SIM",
|
uart5_port=None,
|
origin_gga=GGA_SAMPLE,
|
initial_enu=(0.0, 0.0, 0.0),
|
)
|
sim = simulator_mod.HitlSimulator(config)
|
received_controls: list[tuple[float, float]] = []
|
sim.on_control = lambda f, t: received_controls.append((f, t))
|
|
sim.start()
|
time.sleep(0.25)
|
|
frame = _build_control_frame(struct.pack("<ff", 0.5, 0.2))
|
sim.uart2.inject(frame)
|
time.sleep(0.05)
|
sim.stop()
|
|
nav_frames = [pkt for pkt in sim.uart2.writes if pkt.startswith(b"fmin")]
|
imu_frames = [pkt for pkt in sim.uart2.writes if pkt.startswith(b"fmim")]
|
|
assert all(len(pkt) == 100 for pkt in nav_frames)
|
assert all(len(pkt) == 52 for pkt in imu_frames)
|
|
assert len(nav_frames) >= 2
|
assert len(imu_frames) >= 10
|
assert received_controls, "控制帧未被解析"
|
assert received_controls[-1][0] == pytest.approx(0.5, rel=0.1)
|
assert received_controls[-1][1] == pytest.approx(0.2, rel=0.1)
|