""" HITL 仿真器 GUI:左侧地图显示路径/车辆,右侧显示状态与控制信息。 """ from __future__ import annotations import argparse import json import math import queue import sys import threading from pathlib import Path from typing import Dict, List, Optional, Tuple try: from PyQt5 import QtCore, QtGui, QtWidgets except Exception: # pragma: no cover - Qt 可能不存在 QtWidgets = None try: from serial.tools import list_ports except Exception: # pragma: no cover - serial 可能不存在 list_ports = None from .protocols import ControlStatus, PoseStatus, StackStatus, StateStatus from .simulator import HitlConfig, HitlSimulator, RunLogger DEFAULT_UART2_PORT = "COM11" DEFAULT_UART5_PORT = "COM17" DEFAULT_ORIGIN_GGA = "$GNGGA,080112.000,3949.8105069,N,11616.6876082,E,4,44,0.42,48.502,M,-6.684,M,1.0,0409*73" DEFAULT_ENU = (0.0, 0.0, 0.0) DEFAULT_HEADING_DEG = 0.0 DEFAULT_UART2_BAUD = 115200 DEFAULT_UART5_BAUD = 921600 RUN_LOG_PATH = Path(__file__).resolve().parent / "runlog.txt" def load_path_points(path_file: Path) -> List[Tuple[float, float]]: if not path_file.exists(): raise FileNotFoundError(str(path_file)) text = path_file.read_text(encoding="utf-8").strip() if not text: return [] points: List[Tuple[float, float]] = [] if path_file.suffix.lower() == ".json" or text.startswith("["): try: data = json.loads(text) for pt in data: if isinstance(pt, (list, tuple)) and len(pt) >= 2: points.append((float(pt[0]), float(pt[1]))) return points except json.JSONDecodeError: return [] for token in text.split(";"): token = token.strip() if not token: continue parts = token.split(",") if len(parts) < 2: continue try: points.append((float(parts[0]), float(parts[1]))) except ValueError: continue return points class HitlGuiBridge: def __init__(self): self.queue: queue.Queue = queue.Queue(maxsize=200) self._lock = threading.Lock() def publish(self, kind: str, data): with self._lock: self._enqueue((kind, data)) def reset(self): with self._lock: while not self.queue.empty(): try: self.queue.get_nowait() except queue.Empty: break def _enqueue(self, item): try: self.queue.put_nowait(item) except queue.Full: try: self.queue.get_nowait() except queue.Empty: pass try: self.queue.put_nowait(item) except queue.Full: pass class HitlDashboard(QtWidgets.QMainWindow): def __init__(self, simulator: HitlSimulator, bridge: HitlGuiBridge, path_points: List[Tuple[float, float]], initial_uart2: Optional[str], initial_uart5: Optional[str]): super().__init__() self.simulator = simulator self.bridge = bridge self.path_points = list(path_points or []) self.trail_points: List[Tuple[float, float]] = [] self.control_status: Optional[ControlStatus] = None self.pose_status: Optional[PoseStatus] = None self.state_status: Optional[StateStatus] = None self.stack_status: Dict[str, StackStatus] = {} self.setWindowTitle("HITL 仿真状态面板") self.resize(1280, 720) self._serial_open = False self._last_ports: List[str] = [] self._build_ui(initial_uart2, initial_uart5) self._init_scene() self.timer = QtCore.QTimer(self) self.timer.timeout.connect(self._drain_queue) self.timer.start(80) def _build_ui(self, initial_uart2: Optional[str], initial_uart5: Optional[str]): central = QtWidgets.QWidget() layout = QtWidgets.QHBoxLayout(central) self.setCentralWidget(central) self.scene = QtWidgets.QGraphicsScene(self) self.view = QtWidgets.QGraphicsView(self.scene) self.view.setRenderHint(QtGui.QPainter.Antialiasing, True) self.view.setDragMode(QtWidgets.QGraphicsView.ScrollHandDrag) self.view.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) layout.addWidget(self.view, stretch=3) self._port_refresh_supported = list_ports is not None right_panel = QtWidgets.QWidget() right_layout = QtWidgets.QVBoxLayout(right_panel) right_layout.setContentsMargins(8, 0, 0, 0) layout.addWidget(right_panel, stretch=2) # Serial configuration serial_group = QtWidgets.QGroupBox("串口设置") serial_layout = QtWidgets.QGridLayout(serial_group) self.port2_combo = QtWidgets.QComboBox() self.port5_combo = QtWidgets.QComboBox() self.port_refresh_btn = QtWidgets.QPushButton("刷新") self.serial_toggle_btn = QtWidgets.QPushButton("打开串口") self.serial_status_label = QtWidgets.QLabel("状态: 未连接") self.serial_status_label.setStyleSheet("color: gray;") serial_layout.addWidget(QtWidgets.QLabel("UART2:"), 0, 0) serial_layout.addWidget(self.port2_combo, 0, 1) serial_layout.addWidget(QtWidgets.QLabel("UART5:"), 1, 0) serial_layout.addWidget(self.port5_combo, 1, 1) serial_layout.addWidget(self.port_refresh_btn, 0, 2, 2, 1) serial_layout.addWidget(self.serial_toggle_btn, 2, 0, 1, 3) serial_layout.addWidget(self.serial_status_label, 3, 0, 1, 3) right_layout.addWidget(serial_group) if list_ports is None: self.port_refresh_btn.setEnabled(False) self.port2_combo.setEditable(True) self.port5_combo.setEditable(True) # Path path_group = QtWidgets.QGroupBox("路径文件") path_layout = QtWidgets.QHBoxLayout(path_group) self.path_line = QtWidgets.QLineEdit() self.path_browse_btn = QtWidgets.QPushButton("浏览") self.path_load_btn = QtWidgets.QPushButton("加载") path_layout.addWidget(self.path_line) path_layout.addWidget(self.path_browse_btn) path_layout.addWidget(self.path_load_btn) right_layout.addWidget(path_group) # Origin / pose reset origin_group = QtWidgets.QGroupBox("原点/起点") grid = QtWidgets.QGridLayout(origin_group) self.origin_edit = QtWidgets.QLineEdit(DEFAULT_ORIGIN_GGA) self.origin_btn = QtWidgets.QPushButton("更新原点") grid.addWidget(QtWidgets.QLabel("GGA:"), 0, 0) grid.addWidget(self.origin_edit, 0, 1, 1, 2) grid.addWidget(self.origin_btn, 0, 3) labels = ["E (m)", "N (m)", "U (m)", "Heading (deg)"] self.pos_spin: List[QtWidgets.QDoubleSpinBox] = [] for idx, label in enumerate(labels): spin = QtWidgets.QDoubleSpinBox() if idx < 3: spin.setRange(-1000.0, 1000.0) spin.setDecimals(3) else: spin.setRange(-360.0, 360.0) spin.setDecimals(2) grid.addWidget(QtWidgets.QLabel(label), 1 + idx // 2, (idx % 2) * 2) grid.addWidget(spin, 1 + idx // 2, (idx % 2) * 2 + 1) self.pos_spin.append(spin) self.reset_pose_btn = QtWidgets.QPushButton("重置位置") grid.addWidget(self.reset_pose_btn, 3, 0, 1, 4) right_layout.addWidget(origin_group) # Status status_group = QtWidgets.QGroupBox("车辆状态") status_layout = QtWidgets.QVBoxLayout(status_group) self.info_label = QtWidgets.QLabel("等待数据...") self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;") info_scroll = QtWidgets.QScrollArea() info_scroll.setWidgetResizable(True) info_scroll.setWidget(self.info_label) status_layout.addWidget(info_scroll) right_layout.addWidget(status_group, stretch=1) # Stack table stack_group = QtWidgets.QGroupBox("堆栈监测") stack_layout = QtWidgets.QVBoxLayout(stack_group) self.stack_table = QtWidgets.QTableWidget(0, 3) self.stack_table.setHorizontalHeaderLabels(["任务", "堆栈余量(word)", "剩余堆(byte)"]) self.stack_table.horizontalHeader().setStretchLastSection(True) stack_layout.addWidget(self.stack_table) right_layout.addWidget(stack_group) # Log log_group = QtWidgets.QGroupBox("串口日志") log_layout = QtWidgets.QVBoxLayout(log_group) self.log_view = QtWidgets.QPlainTextEdit() self.log_view.setReadOnly(True) log_layout.addWidget(self.log_view) right_layout.addWidget(log_group, stretch=1) # Scene items self.path_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("gray"), 0.8)) self.trail_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("blue"), 1.0)) self.robot_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, QtGui.QPen(QtGui.QColor("red")), QtGui.QBrush(QtGui.QColor("red"))) self.heading_item = self.scene.addLine(0, 0, 0, 0, QtGui.QPen(QtGui.QColor("red"), 2)) self.target_item = self.scene.addEllipse(-0.2, -0.2, 0.4, 0.4, QtGui.QPen(QtGui.QColor("green")), QtGui.QBrush(QtGui.QColor("green"))) self.path_browse_btn.clicked.connect(self._browse_path) self.path_load_btn.clicked.connect(self._load_path) self.origin_btn.clicked.connect(self._update_origin) self.reset_pose_btn.clicked.connect(self._reset_position) self.port_refresh_btn.clicked.connect(self._refresh_serial_ports) self.serial_toggle_btn.clicked.connect(self._toggle_serial) self._refresh_serial_ports(initial_uart2, initial_uart5) self._set_controls_enabled(False) self._update_serial_ui() def _refresh_serial_ports(self, initial_uart2: Optional[str] = None, initial_uart5: Optional[str] = None): ports: List[str] = [] if list_ports: try: ports = [p.device for p in list_ports.comports()] except Exception: ports = [] saved_uart2 = initial_uart2 or self.simulator.config.uart2_port or "" saved_uart5 = initial_uart5 or self.simulator.config.uart5_port or "" def _fill(combo: QtWidgets.QComboBox, saved: str): current = combo.currentText() combo.blockSignals(True) combo.clear() if ports: combo.addItems(ports) if saved and saved not in ports: combo.addItem(saved) target = saved or current if target: idx = combo.findText(target) if idx >= 0: combo.setCurrentIndex(idx) combo.blockSignals(False) _fill(self.port2_combo, saved_uart2) _fill(self.port5_combo, saved_uart5) self._last_ports = ports def _set_controls_enabled(self, enabled: bool): for widget in ( self.path_line, self.path_browse_btn, self.path_load_btn, self.origin_edit, self.origin_btn, self.reset_pose_btn, ): widget.setEnabled(enabled) self.path_line.setReadOnly(not enabled) self.origin_edit.setReadOnly(not enabled) def _update_serial_ui(self): self.serial_toggle_btn.setText("关闭串口" if self._serial_open else "打开串口") if self._serial_open: uart2 = self.simulator.uart2.port or "未知" uart5 = self.simulator.log_uart.port or "无" self.serial_status_label.setText(f"状态: 已连接 (UART2={uart2}, UART5={uart5})") self.serial_status_label.setStyleSheet("color: green;") else: self.serial_status_label.setText("状态: 未连接") self.serial_status_label.setStyleSheet("color: gray;") enable_ports = not self._serial_open self.port2_combo.setEnabled(enable_ports or self.port2_combo.isEditable()) self.port5_combo.setEnabled(enable_ports or self.port5_combo.isEditable()) if self._port_refresh_supported: self.port_refresh_btn.setEnabled(not self._serial_open) def _toggle_serial(self): if self._serial_open: self._close_serial() return uart2 = self.port2_combo.currentText().strip() if not uart2: QtWidgets.QMessageBox.warning(self, "串口", "请先选择 UART2 端口。") return uart5_text = self.port5_combo.currentText().strip() uart5 = uart5_text or None self.simulator.config.uart2_port = uart2 self.simulator.config.uart5_port = uart5 self.simulator.uart2.port = uart2 self.simulator.log_uart.port = uart5 try: self.simulator.start() except Exception as exc: # pragma: no cover - 依赖串口环境 self.simulator.stop() QtWidgets.QMessageBox.critical(self, "串口", f"打开串口失败: {exc}") return self._serial_open = True self.bridge.reset() self._set_controls_enabled(True) self._update_serial_ui() def _close_serial(self): self.simulator.stop() self.bridge.reset() self.trail_points.clear() self.control_status = None self.pose_status = None self.state_status = None self.stack_status.clear() self.log_view.clear() self.info_label.setText("等待数据...") self.stack_table.setRowCount(0) self._serial_open = False self._set_controls_enabled(False) self._update_serial_ui() self._refresh_view() def closeEvent(self, event): if self._serial_open: self._close_serial() super().closeEvent(event) def _browse_path(self): file_name, _ = QtWidgets.QFileDialog.getOpenFileName(self, "选择路径文件", str(Path.cwd()), "路径文件 (*.txt *.json);;所有文件 (*)") if file_name: self.path_line.setText(file_name) def _load_path(self): file_text = self.path_line.text().strip() if not file_text: QtWidgets.QMessageBox.warning(self, "路径", "请先选择路径文件。") return try: new_points = load_path_points(Path(file_text)) except Exception as exc: QtWidgets.QMessageBox.warning(self, "路径", f"加载失败: {exc}") return if not new_points: QtWidgets.QMessageBox.warning(self, "路径", "未解析到有效路径点。") return self.path_points = new_points self._init_scene() QtWidgets.QMessageBox.information(self, "路径", f"已加载 {len(new_points)} 个路径点。") def _update_origin(self): gga = self.origin_edit.text().strip() if not gga: QtWidgets.QMessageBox.warning(self, "原点", "请输入 GGA 字符串。") return if self.simulator.update_origin(gga): QtWidgets.QMessageBox.information(self, "原点", "原点已更新。") else: QtWidgets.QMessageBox.warning(self, "原点", "原点更新失败,请检查格式。") def _reset_position(self): east = self.pos_spin[0].value() north = self.pos_spin[1].value() up = self.pos_spin[2].value() heading = self.pos_spin[3].value() self.simulator.reset_state(east, north, up, heading) QtWidgets.QMessageBox.information(self, "位置", "已重置仿真状态。") def _init_scene(self): if not self.path_points: self.scene.setSceneRect(-10, -10, 20, 20) return path = QtGui.QPainterPath() first = True xs = [] ys = [] for px, py in self.path_points: xs.append(px) ys.append(-py) if first: path.moveTo(px, -py) first = False else: path.lineTo(px, -py) self.path_item.setPath(path) rect = QtCore.QRectF(min(xs) - 2, min(ys) - 2, (max(xs) - min(xs)) + 4, (max(ys) - min(ys)) + 4) self.scene.setSceneRect(rect) def _append_log(self, text: str): self.log_view.appendPlainText(text) self.log_view.verticalScrollBar().setValue(self.log_view.verticalScrollBar().maximum()) def _drain_queue(self): updated = False while True: try: kind, data = self.bridge.queue.get_nowait() except queue.Empty: break if kind == "control": self.control_status = data elif kind == "pose": self.pose_status = data self.trail_points.append((data.east, data.north)) if len(self.trail_points) > 2000: self.trail_points.pop(0) elif kind == "state": self.state_status = data elif kind == "stack": self.stack_status[data.task_name] = data elif kind == "log": self._append_log(data) updated = True if updated: self._refresh_view() def _refresh_view(self): info_lines = [] if self.pose_status: pose = self.pose_status info_lines += [ "位置 (ENU, m):", f" E: {pose.east:+7.3f}", f" N: {pose.north:+7.3f}", f" U: {pose.up:+7.3f}", "", "姿态 (deg):", f" Heading: {pose.heading_deg:+7.2f}", f" Pitch : {pose.pitch_deg:+7.2f}", f" Roll : {pose.roll_deg:+7.2f}", f" Target : ({pose.target_east:+6.2f}, {pose.target_north:+6.2f})", ] if self.control_status: ctrl = self.control_status info_lines += [ "", "控制量:", f" Forward : {ctrl.forward_mps:+6.3f} m/s", f" Turn : {ctrl.turn_rate:+6.3f} rad/s", f" PWM : steer={ctrl.steering_pwm} throttle={ctrl.throttle_pwm}", f" Freq : {ctrl.freq_hz:6.2f} Hz", f" Stage : {ctrl.stage}", ] if self.state_status: st = self.state_status info_lines += [ "", f"算法状态: {st.stage}", f" XTE : {st.cross_track_error:+6.3f} m", f" Heading: {st.heading_error_deg:+6.2f} deg", ] if self.stack_status: info_lines += ["", "堆栈监测:"] for item in self.stack_status.values(): info_lines.append( f" {item.task_name}: stack={item.stack_high_water} words heap={item.heap_free_bytes} B" ) if not info_lines: info_lines = ["等待数据..."] self.info_label.setText("\n".join(info_lines)) self._update_scene_items() self._refresh_stack_table() def _refresh_stack_table(self): self.stack_table.setRowCount(len(self.stack_status)) for row, status in enumerate(self.stack_status.values()): self.stack_table.setItem(row, 0, QtWidgets.QTableWidgetItem(status.task_name)) self.stack_table.setItem(row, 1, QtWidgets.QTableWidgetItem(str(status.stack_high_water))) self.stack_table.setItem(row, 2, QtWidgets.QTableWidgetItem(str(status.heap_free_bytes))) def _update_scene_items(self): if self.trail_points: trail = QtGui.QPainterPath() first = True for x, y in self.trail_points[-1500:]: scene_y = -y if first: trail.moveTo(x, scene_y) first = False else: trail.lineTo(x, scene_y) self.trail_item.setPath(trail) if self.pose_status: pose = self.pose_status x = pose.east y = -pose.north self.robot_item.setRect(x - 0.3, y - 0.3, 0.6, 0.6) heading_rad = math.radians(pose.heading_deg) dx = math.cos(heading_rad) dy = math.sin(heading_rad) self.heading_item.setLine(x, y, x + dx, y - dy) self.target_item.setRect(pose.target_east - 0.2, -pose.target_north - 0.2, 0.4, 0.4) self.view.centerOn(x, y) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="HITL 仿真器 GUI") parser.add_argument("--uart2", default=DEFAULT_UART2_PORT, help="STM32 UART2 串口号") parser.add_argument("--uart5", default=DEFAULT_UART5_PORT, help="STM32 UART5 串口号") parser.add_argument("--origin", default=DEFAULT_ORIGIN_GGA, help="GGA 原点") parser.add_argument("--east", type=float, default=DEFAULT_ENU[0], help="初始 E (m)") parser.add_argument("--north", type=float, default=DEFAULT_ENU[1], help="初始 N (m)") parser.add_argument("--up", type=float, default=DEFAULT_ENU[2], help="初始 U (m)") parser.add_argument("--heading", type=float, default=DEFAULT_HEADING_DEG, help="初始航向 (deg)") parser.add_argument("--gps-baud", type=int, default=DEFAULT_UART2_BAUD, help="UART2 波特率") parser.add_argument("--log-baud", type=int, default=DEFAULT_UART5_BAUD, help="UART5 波特率") parser.add_argument("--path", help="初始路径文件 (.json/.txt)") parser.add_argument("--no-gui", action="store_true", help="不启动 Qt GUI,仅打印数据") return parser.parse_args() def main(): args = parse_args() cfg = HitlConfig( uart2_port=args.uart2, uart5_port=args.uart5, origin_gga=args.origin, initial_enu=(args.east, args.north, args.up), initial_heading_deg=args.heading, gps_baudrate=args.gps_baud, log_baudrate=args.log_baud, ) run_logger = RunLogger(RUN_LOG_PATH) sim = HitlSimulator(cfg, run_logger=run_logger) bridge = HitlGuiBridge() def publish(kind, data): bridge.publish(kind, data) sim.on_control_status = lambda status: publish("control", status) sim.on_pose_status = lambda status: publish("pose", status) sim.on_state_status = lambda status: publish("state", status) sim.on_stack_status = lambda status: publish("stack", status) sim.on_log = lambda line: publish("log", line) path_points: List[Tuple[float, float]] = [] if args.path: try: path_points = load_path_points(Path(args.path)) except Exception as exc: print(f"[WARN] 路径加载失败: {exc}") if QtWidgets is None or args.no_gui: try: sim.start() print("[INFO] HITL 仿真器已启动 (无 GUI 模式)。按 Ctrl+C 退出。") while True: try: kind, data = bridge.queue.get(timeout=1.0) except queue.Empty: continue if kind == "log": print(data) elif kind == "control": print( f"[CTRL] stage={data.stage} " f"F={data.forward_mps:+.2f} m/s T={data.turn_rate:+.2f} rad/s " f"freq={data.freq_hz:.2f} Hz" ) except KeyboardInterrupt: print("\n[INFO] 用户中断。") finally: sim.stop() run_logger.close() print("[INFO] 仿真器已停止。") return app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) dashboard = HitlDashboard(sim, bridge, path_points, args.uart2, args.uart5) if args.path: dashboard.path_line.setText(args.path) dashboard.show() try: app.exec_() finally: if dashboard._serial_open: dashboard._close_serial() sim.stop() run_logger.close() print("[INFO] 仿真器已停止。") if __name__ == "__main__": sys.exit(main())