yincheng.zhong
2025-12-10 8d0d8dec18cc9170f3fb82a4dba18160dd8e5233
python/hitl/simulator.py
@@ -1,5 +1,5 @@
"""
硬件在环 (HITL) 仿真器:生成 $GPRMI/$GPIMU 传感器数据,并与 STM32H7 通过 PythonLink 闭环。
硬件在环 (HITL) 仿真器:生成 IM23A fmin/fmim 传感器帧,并与 STM32H7 通过 PythonLink 闭环。
"""
from __future__ import annotations
@@ -26,8 +26,8 @@
    PythonLinkFrame,
    StackStatus,
    StateStatus,
    build_gpimu_sentence,
    build_gprmi_sentence,
    build_im23a_imu_frame,
    build_im23a_nav_frame,
    decode_control_status,
    decode_pose_status,
    decode_stack_status,
@@ -130,6 +130,7 @@
    baseline_distance: float = 0.9
    max_linear_speed: float = 2.0
    max_angular_speed_deg: float = 140.0
    position_quality: int = 4
class SerialEndpoint:
@@ -262,8 +263,8 @@
        self._running.set()
        self._threads = [
            threading.Thread(target=self._loop_physics, name="hitl-phys", daemon=True),
            threading.Thread(target=self._loop_gprmi, name="hitl-gprmi", daemon=True),
            threading.Thread(target=self._loop_gpimu, name="hitl-gpimu", daemon=True),
            threading.Thread(target=self._loop_nav, name="hitl-nav", daemon=True),
            threading.Thread(target=self._loop_imu, name="hitl-imu", daemon=True),
            threading.Thread(target=self._loop_control, name="hitl-ctrl", daemon=True),
            threading.Thread(target=self._loop_log, name="hitl-log", daemon=True),
        ]
@@ -290,29 +291,34 @@
            dt = max(now - last, 1e-4)
            last = now
            with self._state_lock:
                # 注意:这里的 self._target_linear/angular 是从 STM32 发回来的 PWM 转换后的估算值
                # 实际上是 STM32 认为的命令,或者如果我们只收 PWM 的话,它就是实际控制量
                # STM32 发送的是 target_mps 和 target_turn,或者是 PWM
                # 在 protocols.py 中,如果我们收到了 PWM,就会转换为速度
                # 如果收到的是 target_mps,那就是 target_mps
                # 现在的逻辑是:Simulator 收到 PythonLinkFrame,里面可能是 PWM 反算的 velocity
                # 这样就构成了闭环:STM32 计算 PWM -> Python 接收 PWM -> 反算速度 -> 物理模型积分 -> 传感器数据 -> STM32
                state = self.model.step(self._target_linear, self._target_angular, dt).copy()
                self._latest_state = state
                self._sim_time += timedelta(seconds=dt)
            time.sleep(0.005)
    def _loop_gprmi(self):
    def _loop_nav(self):
        period = 0.1  # 10 Hz
        while self._running.is_set():
            start = time.perf_counter()
            sentence, gps_time_s = self._build_gprmi_sentence()
            if sentence:
                self.uart2.write(sentence)
                self._log_ascii("PY->STM32 UART2 GPFMI", sentence, gps_time_s=gps_time_s, source_rank=0)
            frame, gps_time_s = self._build_nav_frame()
            if frame:
                self.uart2.write(frame)
            self._sleep_remaining(start, period)
    def _loop_gpimu(self):
    def _loop_imu(self):
        period = 0.01  # 100 Hz
        while self._running.is_set():
            start = time.perf_counter()
            sentence, gps_time_s = self._build_gpimu_sentence()
            if sentence:
                self.uart2.write(sentence)
                self._log_ascii("PY->STM32 UART2 GPIMU", sentence, gps_time_s=gps_time_s, source_rank=0)
            frame, gps_time_s = self._build_imu_frame()
            if frame:
                self.uart2.write(frame)
            self._sleep_remaining(start, period)
    def _loop_control(self):
@@ -351,6 +357,7 @@
                        gps_time_s=_ascii_timestamp_to_seconds(ctrl.timestamp_ms),
                        source_rank=1,
                    )
                    self._apply_ascii_control(ctrl)
                    handled = True
                pose = decode_pose_status(msg)
                if pose and self.on_pose_status:
@@ -384,40 +391,42 @@
    # ------------------------------------------------------------------ #
    # 构造帧
    # ------------------------------------------------------------------ #
    def _build_gprmi_sentence(self) -> tuple[bytes | None, float]:
    def _build_nav_frame(self) -> tuple[bytes | None, float]:
        state, timestamp = self._snapshot()
        lat, lon, alt = geo.enu_to_lla(state.east, state.north, state.up, self.origin)
        heading_nav = geo.heading_math_to_nav(state.heading)
        heading_nav_deg = geo.heading_math_to_nav(state.heading)
        # 将导航坐标系角度(0-360度)转换为弧度(-π到π),0度正北对应0弧度
        # 0°(正北)→0rad, 90°(正东)→π/2rad, 180°(正南)→πrad, 270°(正西)→-π/2rad
        heading_nav_rad = math.radians(heading_nav_deg) if heading_nav_deg <= 180.0 else math.radians(heading_nav_deg - 360.0)
        gps_time_s = _seconds_of_day(timestamp)
        return (
            build_gprmi_sentence(
                timestamp=timestamp,
                lat_deg=lat,
                lon_deg=lon,
                alt_m=alt,
                east_vel=state.east_velocity,
                north_vel=state.north_velocity,
                up_vel=state.up_velocity,
                heading_deg=heading_nav,
                pitch_deg=state.pitch_deg,
                roll_deg=state.roll_deg,
                baseline_m=self.config.baseline_distance,
            ),
            gps_time_s,
        status_flags = int(self.config.position_quality) & 0xFF
        frame = build_im23a_nav_frame(
            timestamp=timestamp,
            lat_deg=lat,
            lon_deg=lon,
            alt_m=alt,
            east_vel=state.east_velocity,
            north_vel=state.north_velocity,
            up_vel=state.up_velocity,
            heading_rad=heading_nav_rad,
            pitch_deg=state.pitch_deg,
            roll_deg=state.roll_deg,
            accel_bias=(0.0, 0.0, 0.0),
            gyro_bias=(0.0, 0.0, 0.0),
            temperature_c=state.temperature_c,
            status_flags=status_flags,
        )
        return frame, gps_time_s
    def _build_gpimu_sentence(self) -> tuple[bytes | None, float]:
    def _build_imu_frame(self) -> tuple[bytes | None, float]:
        state, timestamp = self._snapshot()
        gps_time_s = _seconds_of_day(timestamp)
        return (
            build_gpimu_sentence(
                timestamp=timestamp,
                accel_g=state.body_accel_g,
                gyro_deg_s=state.gyro_deg_s,
                temperature_c=state.temperature_c,
            ),
            gps_time_s,
        frame = build_im23a_imu_frame(
            timestamp=timestamp,
            accel_g=state.body_accel_g,
            gyro_deg_s=state.gyro_deg_s,
        )
        return frame, gps_time_s
    # ------------------------------------------------------------------ #
    # 工具
@@ -469,6 +478,13 @@
    # ------------------------------------------------------------------ #
    # 日志工具
    # ------------------------------------------------------------------ #
    def _apply_ascii_control(self, ctrl: ControlStatus):
        with self._state_lock:
            self._target_linear = ctrl.forward_mps
            self._target_angular = ctrl.turn_rate
        if self.on_control:
            self.on_control(ctrl.forward_mps, ctrl.turn_rate)
    def _log_ascii(
        self,
        prefix: str,