yincheng.zhong
2025-12-04 f88f3da8f132cd1dd321dfc584a1fe68b6eb2138
python/hitl/simulator.py
@@ -1,5 +1,5 @@
"""
硬件在环 (HITL) 仿真器:生成 $GPRMI/$GPIMU 传感器数据,并与 STM32H7 通过 PythonLink 闭环。
硬件在环 (HITL) 仿真器:生成 IM23A fmin/fmim 传感器帧,并与 STM32H7 通过 PythonLink 闭环。
"""
from __future__ import annotations
@@ -26,8 +26,8 @@
    PythonLinkFrame,
    StackStatus,
    StateStatus,
    build_gpimu_sentence,
    build_gprmi_sentence,
    build_im23a_imu_frame,
    build_im23a_nav_frame,
    decode_control_status,
    decode_pose_status,
    decode_stack_status,
@@ -130,6 +130,7 @@
    baseline_distance: float = 0.9
    max_linear_speed: float = 2.0
    max_angular_speed_deg: float = 140.0
    position_quality: int = 4
class SerialEndpoint:
@@ -262,8 +263,8 @@
        self._running.set()
        self._threads = [
            threading.Thread(target=self._loop_physics, name="hitl-phys", daemon=True),
            threading.Thread(target=self._loop_gprmi, name="hitl-gprmi", daemon=True),
            threading.Thread(target=self._loop_gpimu, name="hitl-gpimu", daemon=True),
            threading.Thread(target=self._loop_nav, name="hitl-nav", daemon=True),
            threading.Thread(target=self._loop_imu, name="hitl-imu", daemon=True),
            threading.Thread(target=self._loop_control, name="hitl-ctrl", daemon=True),
            threading.Thread(target=self._loop_log, name="hitl-log", daemon=True),
        ]
@@ -295,24 +296,24 @@
                self._sim_time += timedelta(seconds=dt)
            time.sleep(0.005)
    def _loop_gprmi(self):
    def _loop_nav(self):
        period = 0.1  # 10 Hz
        while self._running.is_set():
            start = time.perf_counter()
            sentence, gps_time_s = self._build_gprmi_sentence()
            if sentence:
                self.uart2.write(sentence)
                self._log_ascii("PY->STM32 UART2 GPFMI", sentence, gps_time_s=gps_time_s, source_rank=0)
            frame, gps_time_s = self._build_nav_frame()
            if frame:
                self.uart2.write(frame)
                self._log_binary("PY->STM32 UART2 IM23A_NAV", frame, gps_time_s=gps_time_s, source_rank=0)
            self._sleep_remaining(start, period)
    def _loop_gpimu(self):
    def _loop_imu(self):
        period = 0.01  # 100 Hz
        while self._running.is_set():
            start = time.perf_counter()
            sentence, gps_time_s = self._build_gpimu_sentence()
            if sentence:
                self.uart2.write(sentence)
                self._log_ascii("PY->STM32 UART2 GPIMU", sentence, gps_time_s=gps_time_s, source_rank=0)
            frame, gps_time_s = self._build_imu_frame()
            if frame:
                self.uart2.write(frame)
                self._log_binary("PY->STM32 UART2 IM23A_IMU", frame, gps_time_s=gps_time_s, source_rank=0)
            self._sleep_remaining(start, period)
    def _loop_control(self):
@@ -351,6 +352,7 @@
                        gps_time_s=_ascii_timestamp_to_seconds(ctrl.timestamp_ms),
                        source_rank=1,
                    )
                    self._apply_ascii_control(ctrl)
                    handled = True
                pose = decode_pose_status(msg)
                if pose and self.on_pose_status:
@@ -384,40 +386,39 @@
    # ------------------------------------------------------------------ #
    # 构造帧
    # ------------------------------------------------------------------ #
    def _build_gprmi_sentence(self) -> tuple[bytes | None, float]:
    def _build_nav_frame(self) -> tuple[bytes | None, float]:
        state, timestamp = self._snapshot()
        lat, lon, alt = geo.enu_to_lla(state.east, state.north, state.up, self.origin)
        heading_nav = geo.heading_math_to_nav(state.heading)
        gps_time_s = _seconds_of_day(timestamp)
        return (
            build_gprmi_sentence(
                timestamp=timestamp,
                lat_deg=lat,
                lon_deg=lon,
                alt_m=alt,
                east_vel=state.east_velocity,
                north_vel=state.north_velocity,
                up_vel=state.up_velocity,
                heading_deg=heading_nav,
                pitch_deg=state.pitch_deg,
                roll_deg=state.roll_deg,
                baseline_m=self.config.baseline_distance,
            ),
            gps_time_s,
        status_flags = int(self.config.position_quality) & 0xFF
        frame = build_im23a_nav_frame(
            timestamp=timestamp,
            lat_deg=lat,
            lon_deg=lon,
            alt_m=alt,
            east_vel=state.east_velocity,
            north_vel=state.north_velocity,
            up_vel=state.up_velocity,
            heading_deg=heading_nav,
            pitch_deg=state.pitch_deg,
            roll_deg=state.roll_deg,
            accel_bias=(0.0, 0.0, 0.0),
            gyro_bias=(0.0, 0.0, 0.0),
            temperature_c=state.temperature_c,
            status_flags=status_flags,
        )
        return frame, gps_time_s
    def _build_gpimu_sentence(self) -> tuple[bytes | None, float]:
    def _build_imu_frame(self) -> tuple[bytes | None, float]:
        state, timestamp = self._snapshot()
        gps_time_s = _seconds_of_day(timestamp)
        return (
            build_gpimu_sentence(
                timestamp=timestamp,
                accel_g=state.body_accel_g,
                gyro_deg_s=state.gyro_deg_s,
                temperature_c=state.temperature_c,
            ),
            gps_time_s,
        frame = build_im23a_imu_frame(
            timestamp=timestamp,
            accel_g=state.body_accel_g,
            gyro_deg_s=state.gyro_deg_s,
        )
        return frame, gps_time_s
    # ------------------------------------------------------------------ #
    # 工具
@@ -469,6 +470,13 @@
    # ------------------------------------------------------------------ #
    # 日志工具
    # ------------------------------------------------------------------ #
    def _apply_ascii_control(self, ctrl: ControlStatus):
        with self._state_lock:
            self._target_linear = ctrl.forward_mps
            self._target_angular = ctrl.turn_rate
        if self.on_control:
            self.on_control(ctrl.forward_mps, ctrl.turn_rate)
    def _log_ascii(
        self,
        prefix: str,