| | |
| | | """ |
| | | 硬件在环 (HITL) 仿真器:生成 $GPRMI/$GPIMU 传感器数据,并与 STM32H7 通过 PythonLink 闭环。 |
| | | 硬件在环 (HITL) 仿真器:生成 IM23A fmin/fmim 传感器帧,并与 STM32H7 通过 PythonLink 闭环。 |
| | | """ |
| | | |
| | | from __future__ import annotations |
| | |
| | | PythonLinkFrame, |
| | | StackStatus, |
| | | StateStatus, |
| | | build_gpimu_sentence, |
| | | build_gprmi_sentence, |
| | | build_im23a_imu_frame, |
| | | build_im23a_nav_frame, |
| | | decode_control_status, |
| | | decode_pose_status, |
| | | decode_stack_status, |
| | |
| | | baseline_distance: float = 0.9 |
| | | max_linear_speed: float = 2.0 |
| | | max_angular_speed_deg: float = 140.0 |
| | | position_quality: int = 4 |
| | | |
| | | |
| | | class SerialEndpoint: |
| | |
| | | self._running.set() |
| | | self._threads = [ |
| | | threading.Thread(target=self._loop_physics, name="hitl-phys", daemon=True), |
| | | threading.Thread(target=self._loop_gprmi, name="hitl-gprmi", daemon=True), |
| | | threading.Thread(target=self._loop_gpimu, name="hitl-gpimu", daemon=True), |
| | | threading.Thread(target=self._loop_nav, name="hitl-nav", daemon=True), |
| | | threading.Thread(target=self._loop_imu, name="hitl-imu", daemon=True), |
| | | threading.Thread(target=self._loop_control, name="hitl-ctrl", daemon=True), |
| | | threading.Thread(target=self._loop_log, name="hitl-log", daemon=True), |
| | | ] |
| | |
| | | self._sim_time += timedelta(seconds=dt) |
| | | time.sleep(0.005) |
| | | |
| | | def _loop_gprmi(self): |
| | | def _loop_nav(self): |
| | | period = 0.1 # 10 Hz |
| | | while self._running.is_set(): |
| | | start = time.perf_counter() |
| | | sentence, gps_time_s = self._build_gprmi_sentence() |
| | | if sentence: |
| | | self.uart2.write(sentence) |
| | | self._log_ascii("PY->STM32 UART2 GPFMI", sentence, gps_time_s=gps_time_s, source_rank=0) |
| | | frame, gps_time_s = self._build_nav_frame() |
| | | if frame: |
| | | self.uart2.write(frame) |
| | | self._log_binary("PY->STM32 UART2 IM23A_NAV", frame, gps_time_s=gps_time_s, source_rank=0) |
| | | self._sleep_remaining(start, period) |
| | | |
| | | def _loop_gpimu(self): |
| | | def _loop_imu(self): |
| | | period = 0.01 # 100 Hz |
| | | while self._running.is_set(): |
| | | start = time.perf_counter() |
| | | sentence, gps_time_s = self._build_gpimu_sentence() |
| | | if sentence: |
| | | self.uart2.write(sentence) |
| | | self._log_ascii("PY->STM32 UART2 GPIMU", sentence, gps_time_s=gps_time_s, source_rank=0) |
| | | frame, gps_time_s = self._build_imu_frame() |
| | | if frame: |
| | | self.uart2.write(frame) |
| | | self._log_binary("PY->STM32 UART2 IM23A_IMU", frame, gps_time_s=gps_time_s, source_rank=0) |
| | | self._sleep_remaining(start, period) |
| | | |
| | | def _loop_control(self): |
| | |
| | | gps_time_s=_ascii_timestamp_to_seconds(ctrl.timestamp_ms), |
| | | source_rank=1, |
| | | ) |
| | | self._apply_ascii_control(ctrl) |
| | | handled = True |
| | | pose = decode_pose_status(msg) |
| | | if pose and self.on_pose_status: |
| | |
| | | # ------------------------------------------------------------------ # |
| | | # 构造帧 |
| | | # ------------------------------------------------------------------ # |
| | | def _build_gprmi_sentence(self) -> tuple[bytes | None, float]: |
| | | def _build_nav_frame(self) -> tuple[bytes | None, float]: |
| | | state, timestamp = self._snapshot() |
| | | lat, lon, alt = geo.enu_to_lla(state.east, state.north, state.up, self.origin) |
| | | heading_nav = geo.heading_math_to_nav(state.heading) |
| | | gps_time_s = _seconds_of_day(timestamp) |
| | | return ( |
| | | build_gprmi_sentence( |
| | | timestamp=timestamp, |
| | | lat_deg=lat, |
| | | lon_deg=lon, |
| | | alt_m=alt, |
| | | east_vel=state.east_velocity, |
| | | north_vel=state.north_velocity, |
| | | up_vel=state.up_velocity, |
| | | heading_deg=heading_nav, |
| | | pitch_deg=state.pitch_deg, |
| | | roll_deg=state.roll_deg, |
| | | baseline_m=self.config.baseline_distance, |
| | | ), |
| | | gps_time_s, |
| | | status_flags = int(self.config.position_quality) & 0xFF |
| | | frame = build_im23a_nav_frame( |
| | | timestamp=timestamp, |
| | | lat_deg=lat, |
| | | lon_deg=lon, |
| | | alt_m=alt, |
| | | east_vel=state.east_velocity, |
| | | north_vel=state.north_velocity, |
| | | up_vel=state.up_velocity, |
| | | heading_deg=heading_nav, |
| | | pitch_deg=state.pitch_deg, |
| | | roll_deg=state.roll_deg, |
| | | accel_bias=(0.0, 0.0, 0.0), |
| | | gyro_bias=(0.0, 0.0, 0.0), |
| | | temperature_c=state.temperature_c, |
| | | status_flags=status_flags, |
| | | ) |
| | | return frame, gps_time_s |
| | | |
| | | def _build_gpimu_sentence(self) -> tuple[bytes | None, float]: |
| | | def _build_imu_frame(self) -> tuple[bytes | None, float]: |
| | | state, timestamp = self._snapshot() |
| | | gps_time_s = _seconds_of_day(timestamp) |
| | | return ( |
| | | build_gpimu_sentence( |
| | | timestamp=timestamp, |
| | | accel_g=state.body_accel_g, |
| | | gyro_deg_s=state.gyro_deg_s, |
| | | temperature_c=state.temperature_c, |
| | | ), |
| | | gps_time_s, |
| | | frame = build_im23a_imu_frame( |
| | | timestamp=timestamp, |
| | | accel_g=state.body_accel_g, |
| | | gyro_deg_s=state.gyro_deg_s, |
| | | ) |
| | | return frame, gps_time_s |
| | | |
| | | # ------------------------------------------------------------------ # |
| | | # 工具 |
| | |
| | | # ------------------------------------------------------------------ # |
| | | # 日志工具 |
| | | # ------------------------------------------------------------------ # |
| | | def _apply_ascii_control(self, ctrl: ControlStatus): |
| | | with self._state_lock: |
| | | self._target_linear = ctrl.forward_mps |
| | | self._target_angular = ctrl.turn_rate |
| | | if self.on_control: |
| | | self.on_control(ctrl.forward_mps, ctrl.turn_rate) |
| | | |
| | | def _log_ascii( |
| | | self, |
| | | prefix: str, |