""" HITL 仿真器 GUI:左侧地图显示路径/车辆,右侧显示状态与控制信息。 """ from __future__ import annotations import argparse import json import math import queue import sys import threading from pathlib import Path from typing import Dict, List, Optional, Tuple try: from PyQt5 import QtCore, QtGui, QtWidgets except Exception: # pragma: no cover - Qt 可能不存在 QtWidgets = None try: from serial.tools import list_ports except Exception: # pragma: no cover - serial 可能不存在 list_ports = None from .protocols import ControlStatus, PoseStatus, StackStatus, StateStatus, nmea_checksum from .simulator import HitlConfig, HitlSimulator, RunLogger DEFAULT_UART2_PORT = "COM11" DEFAULT_UART5_PORT = "COM17" DEFAULT_ORIGIN_GGA = "$GNGGA,080112.000,3949.8105069,N,11616.6876082,E,4,44,0.42,48.502,M,-6.684,M,1.0,0409*73" DEFAULT_ENU = (0.0, 0.0, 0.0) DEFAULT_HEADING_DEG = 0.0 DEFAULT_UART2_BAUD = 115200 DEFAULT_UART5_BAUD = 921600 BASE_DIR = Path(__file__).resolve().parent RUN_LOG_PATH = BASE_DIR / "runlog.txt" PATH_TEXT_FILE = BASE_DIR / "path_text_input.txt" PATH_EXPORT_FILE = PATH_TEXT_FILE.with_suffix(".c") STATE_FILE = BASE_DIR / "dashboard_state.json" def load_path_points(path_file: Path) -> List[Tuple[float, float]]: if not path_file.exists(): raise FileNotFoundError(str(path_file)) text = path_file.read_text(encoding="utf-8").strip() if not text: return [] points: List[Tuple[float, float]] = [] if path_file.suffix.lower() == ".json" or text.startswith("["): try: data = json.loads(text) for pt in data: if isinstance(pt, (list, tuple)) and len(pt) >= 2: points.append((float(pt[0]), float(pt[1]))) return points except json.JSONDecodeError: return [] for token in text.split(";"): token = token.strip() if not token: continue parts = token.split(",") if len(parts) < 2: continue try: points.append((float(parts[0]), float(parts[1]))) except ValueError: continue return points def parse_path_text(text: str) -> List[Tuple[float, float]]: tokens = text.replace("\n", " ").split(";") points: List[Tuple[float, float]] = [] for token in tokens: token = token.strip() if not token: continue parts = token.split(",") if len(parts) < 2: continue try: x = float(parts[0].strip()) y = float(parts[1].strip()) points.append((x, y)) except ValueError: continue return points def format_path_for_mcu(points: List[Tuple[float, float]], decimals: int = 2) -> str: if not points: return "const float g_motion_path_xy[] = {};\n" fmt = f"{{:.{decimals}f}}" lines = ["const float g_motion_path_xy[] = {"] for idx, (x, y) in enumerate(points): comma = "," if idx < len(points) - 1 else "" lines.append(f" {fmt.format(x)}f, {fmt.format(y)}f{comma}") lines.append("};") return "\n".join(lines) def gga_to_short_origin(gga: str) -> str: text = (gga or "").strip() if not text: return "" body = text.split("*")[0] parts = [p.strip() for p in body.split(",")] if len(parts) < 6: return "" lat = parts[2] lat_dir = parts[3] lon = parts[4] lon_dir = parts[5] alt = parts[9] if len(parts) > 9 and parts[9] else "" short = f"{lat},{lat_dir},{lon},{lon_dir}" if alt: short += f",{alt}" return short def build_gga_from_short_origin(text: str, template: str) -> str: tokens = [t.strip() for t in text.replace("\n", " ").split(",") if t.strip()] if len(tokens) < 4: raise ValueError("原点格式应为 'lat,N,lon,E[,alt]'。") lat_dm = tokens[0] lat_dir = tokens[1].upper() lon_dm = tokens[2] lon_dir = tokens[3].upper() alt = tokens[4] if len(tokens) >= 5 else None if lat_dir not in ("N", "S") or lon_dir not in ("E", "W"): raise ValueError("方向必须是 N/S/E/W。") base = template or DEFAULT_ORIGIN_GGA newline = "" if base.endswith("\r\n"): base = base[:-2] newline = "\r\n" body = base.split("*")[0] fields = body.split(",") while len(fields) <= 13: fields.append("") fields[0] = fields[0] or "$GNGGA" fields[2] = lat_dm fields[3] = lat_dir fields[4] = lon_dm fields[5] = lon_dir if alt is not None: fields[9] = alt body = ",".join(fields) checksum = nmea_checksum(body) return f"{body}*{checksum}{newline}" class HitlGuiBridge: def __init__(self): self.queue: queue.Queue = queue.Queue(maxsize=200) self._lock = threading.Lock() def publish(self, kind: str, data): with self._lock: self._enqueue((kind, data)) def reset(self): with self._lock: while not self.queue.empty(): try: self.queue.get_nowait() except queue.Empty: break def _enqueue(self, item): try: self.queue.put_nowait(item) except queue.Full: try: self.queue.get_nowait() except queue.Empty: pass try: self.queue.put_nowait(item) except queue.Full: pass class ZoomableGraphicsView(QtWidgets.QGraphicsView): sceneMouseMoved = QtCore.pyqtSignal(float, float) """支持鼠标滚轮缩放 + 按住左键平移的视图""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setRenderHint(QtGui.QPainter.Antialiasing, True) self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) self.setResizeAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) self.setDragMode(QtWidgets.QGraphicsView.NoDrag) self.setCursor(QtCore.Qt.OpenHandCursor) self._zoom_factor = 1.15 # 每次滚轮的缩放因子 self._panning = False self._pan_last_pos = QtCore.QPoint() self.on_user_pan = None # 由外部注入的回调 def wheelEvent(self, event: QtGui.QWheelEvent): delta = event.angleDelta().y() scale_factor = self._zoom_factor if delta > 0 else 1.0 / self._zoom_factor self.scale(scale_factor, scale_factor) def mousePressEvent(self, event: QtGui.QMouseEvent): if event.button() == QtCore.Qt.LeftButton: self._panning = True self._pan_last_pos = event.pos() self.setCursor(QtCore.Qt.ClosedHandCursor) event.accept() return super().mousePressEvent(event) def mouseMoveEvent(self, event: QtGui.QMouseEvent): if self._panning: scene_pos = self.mapToScene(event.pos()) last_scene_pos = self.mapToScene(self._pan_last_pos) delta = scene_pos - last_scene_pos self._pan_last_pos = event.pos() self.translate(-delta.x(), -delta.y()) if self.on_user_pan: self.on_user_pan() event.accept() return self.sceneMouseMoved.emit(*self._scene_coords(event.pos())) super().mouseMoveEvent(event) def _scene_coords(self, view_pos: QtCore.QPoint) -> Tuple[float, float]: scene_pos = self.mapToScene(view_pos) return scene_pos.x(), scene_pos.y() def mouseReleaseEvent(self, event: QtGui.QMouseEvent): if event.button() == QtCore.Qt.LeftButton and self._panning: self._panning = False self.setCursor(QtCore.Qt.OpenHandCursor) event.accept() return super().mouseReleaseEvent(event) class DashboardStateStore: def __init__(self, path: Path): self.path = Path(path) self.data: Dict[str, object] = {} self._load() def _load(self): if not self.path.exists(): self.data = {} return try: self.data = json.loads(self.path.read_text(encoding="utf-8")) except Exception: self.data = {} def save(self): try: self.path.write_text(json.dumps(self.data, ensure_ascii=False, indent=2), encoding="utf-8") except Exception: pass def get_pose(self) -> Optional[Tuple[float, float, float, float]]: pose = self.data.get("pose") if isinstance(pose, list) and len(pose) == 4: try: return tuple(float(v) for v in pose) # type: ignore except (TypeError, ValueError): return None return None def set_pose(self, east: float, north: float, up: float, heading: float): self.data["pose"] = [east, north, up, heading] self.save() def get_origin(self) -> Optional[str]: origin = self.data.get("origin") if isinstance(origin, str): return origin return None def set_origin(self, origin_str: str): self.data["origin"] = origin_str self.save() class HitlDashboard(QtWidgets.QMainWindow): def __init__(self, simulator: HitlSimulator, bridge: HitlGuiBridge, path_points: List[Tuple[float, float]], initial_uart2: Optional[str], initial_uart5: Optional[str]): super().__init__() self.simulator = simulator self.bridge = bridge self.path_points = list(path_points or []) self.trail_points: List[Tuple[float, float]] = [] self.control_status: Optional[ControlStatus] = None self.pose_status: Optional[PoseStatus] = None self.state_status: Optional[StateStatus] = None self.stack_status: Dict[str, StackStatus] = {} self._auto_follow = True self.setWindowTitle("HITL 仿真状态面板") self.resize(1280, 720) self._serial_open = False self._last_ports: List[str] = [] self.state_store = DashboardStateStore(STATE_FILE) self._build_ui(initial_uart2, initial_uart5) self._load_saved_path_text() if not self.path_text.toPlainText().strip() and self.path_points: self._update_path_text_from_points(self.path_points) self._load_saved_pose() self._init_scene() self.timer = QtCore.QTimer(self) self.timer.timeout.connect(self._drain_queue) self.timer.start(80) def _build_ui(self, initial_uart2: Optional[str], initial_uart5: Optional[str]): central = QtWidgets.QWidget() layout = QtWidgets.QHBoxLayout(central) self.setCentralWidget(central) self.scene = QtWidgets.QGraphicsScene(self) left_panel = QtWidgets.QWidget() left_layout = QtWidgets.QVBoxLayout(left_panel) left_layout.setContentsMargins(0, 0, 0, 0) left_layout.setSpacing(4) self.view = ZoomableGraphicsView(self.scene) self.view.on_user_pan = self._handle_user_pan self.view.sceneMouseMoved.connect(self._update_mouse_pos) left_layout.addWidget(self.view, stretch=1) # Bottom bar for map controls bottom_bar = QtWidgets.QWidget() bottom_layout = QtWidgets.QHBoxLayout(bottom_bar) bottom_layout.setContentsMargins(0, 4, 0, 0) self.follow_checkbox = QtWidgets.QCheckBox("自动跟随车辆") self.follow_checkbox.setChecked(True) self.follow_checkbox.toggled.connect(self._on_follow_toggled) bottom_layout.addWidget(self.follow_checkbox) self.clear_trail_btn = QtWidgets.QPushButton("清除轨迹") self.clear_trail_btn.clicked.connect(self._clear_trail) bottom_layout.addWidget(self.clear_trail_btn) bottom_layout.addStretch(1) self.mouse_pos_label = QtWidgets.QLabel("E: 0.00 N: 0.00") self.mouse_pos_label.setStyleSheet("color:#222; background:rgba(255,255,255,190); padding:2px 6px; border-radius:4px;") bottom_layout.addWidget(self.mouse_pos_label) left_layout.addWidget(bottom_bar) layout.addWidget(left_panel, stretch=3) self._port_refresh_supported = list_ports is not None right_panel = QtWidgets.QWidget() right_layout = QtWidgets.QVBoxLayout(right_panel) right_layout.setContentsMargins(8, 0, 0, 0) layout.addWidget(right_panel, stretch=2) # Serial configuration serial_group = QtWidgets.QGroupBox("串口设置") serial_layout = QtWidgets.QGridLayout(serial_group) self.port2_combo = QtWidgets.QComboBox() self.port5_combo = QtWidgets.QComboBox() self.port_refresh_btn = QtWidgets.QPushButton("刷新") self.serial_toggle_btn = QtWidgets.QPushButton("打开串口") self.serial_status_label = QtWidgets.QLabel("状态: 未连接") self.serial_status_label.setStyleSheet("color: gray;") serial_layout.addWidget(QtWidgets.QLabel("UART2:"), 0, 0) serial_layout.addWidget(self.port2_combo, 0, 1) serial_layout.addWidget(QtWidgets.QLabel("UART5:"), 1, 0) serial_layout.addWidget(self.port5_combo, 1, 1) serial_layout.addWidget(self.port_refresh_btn, 0, 2, 2, 1) serial_layout.addWidget(self.serial_toggle_btn, 2, 0, 1, 3) serial_layout.addWidget(self.serial_status_label, 3, 0, 1, 3) right_layout.addWidget(serial_group) if list_ports is None: self.port_refresh_btn.setEnabled(False) self.port2_combo.setEditable(True) self.port5_combo.setEditable(True) # Path text input path_group = QtWidgets.QGroupBox("路径轨迹") path_layout = QtWidgets.QVBoxLayout(path_group) self.path_text = QtWidgets.QPlainTextEdit() self.path_text.setPlaceholderText("示例:23.36,-42.93;29.26,-52.99; ... ;") self.path_text.setWordWrapMode(QtGui.QTextOption.NoWrap) button_row = QtWidgets.QHBoxLayout() self.path_load_btn = QtWidgets.QPushButton("加载到地图") self.path_export_btn = QtWidgets.QPushButton("导出 MCU 格式") button_row.addWidget(self.path_load_btn) button_row.addWidget(self.path_export_btn) path_layout.addWidget(self.path_text) path_layout.addLayout(button_row) right_layout.addWidget(path_group) # Origin / pose reset origin_group = QtWidgets.QGroupBox("原点/起点") grid = QtWidgets.QGridLayout(origin_group) default_origin_short = gga_to_short_origin(self.simulator.config.origin_gga) self.origin_text = QtWidgets.QLineEdit(default_origin_short) self.origin_text.setPlaceholderText("3949.9120,N,11616.8544,E[,alt]") self.origin_btn = QtWidgets.QPushButton("更新原点") self.copy_origin_btn = QtWidgets.QPushButton("复制宏定义") # 新增按钮 grid.addWidget(QtWidgets.QLabel("坐标 (lat,N,lon,E):"), 0, 0, 1, 2) grid.addWidget(self.origin_text, 0, 2, 1, 2) grid.addWidget(self.origin_btn, 0, 4) grid.addWidget(self.copy_origin_btn, 0, 5) # 添加到布局 labels = ["E (m)", "N (m)", "U (m)", "Heading (deg)"] self.pos_spin: List[QtWidgets.QDoubleSpinBox] = [] for idx, label in enumerate(labels): spin = QtWidgets.QDoubleSpinBox() if idx < 3: spin.setRange(-1000.0, 1000.0) spin.setDecimals(3) else: spin.setRange(-360.0, 360.0) spin.setDecimals(2) grid.addWidget(QtWidgets.QLabel(label), 1 + idx // 2, (idx % 2) * 2) grid.addWidget(spin, 1 + idx // 2, (idx % 2) * 2 + 1) self.pos_spin.append(spin) self.reset_pose_btn = QtWidgets.QPushButton("重置位置") grid.addWidget(self.reset_pose_btn, 3, 0, 1, 4) right_layout.addWidget(origin_group) # Status status_group = QtWidgets.QGroupBox("车辆状态") status_layout = QtWidgets.QVBoxLayout(status_group) self.info_label = QtWidgets.QLabel("等待数据...") self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 14px;") info_scroll = QtWidgets.QScrollArea() info_scroll.setWidgetResizable(True) info_scroll.setWidget(self.info_label) info_scroll.setMinimumHeight(500) # 进一步增加最小高度 status_layout.addWidget(info_scroll) right_layout.addWidget(status_group, stretch=4) # 进一步增加布局权重,从2增加到4 # Stack table stack_group = QtWidgets.QGroupBox("堆栈监测") stack_layout = QtWidgets.QVBoxLayout(stack_group) self.stack_table = QtWidgets.QTableWidget(0, 3) self.stack_table.setHorizontalHeaderLabels(["任务", "堆栈余量(word)", "剩余堆(byte)"]) self.stack_table.horizontalHeader().setStretchLastSection(True) stack_layout.addWidget(self.stack_table) right_layout.addWidget(stack_group) # Log log_group = QtWidgets.QGroupBox("串口日志") log_layout = QtWidgets.QVBoxLayout(log_group) self.log_view = QtWidgets.QPlainTextEdit() self.log_view.setReadOnly(True) log_layout.addWidget(self.log_view) right_layout.addWidget(log_group, stretch=1) # Scene items self.path_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("gray"), 0)) self.trail_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("blue"), 0)) self.robot_size = 0.25 self.robot_item = self.scene.addEllipse( -self.robot_size, -self.robot_size, self.robot_size * 2, self.robot_size * 2, QtGui.QPen(QtGui.QColor("#d11d29"), 0), QtGui.QBrush(QtGui.QColor("#ff4b5c")), ) arrow_pen = QtGui.QPen(QtGui.QColor("#ff8c00"), 0) arrow_pen.setJoinStyle(QtCore.Qt.RoundJoin) self.heading_item = self.scene.addPath( QtGui.QPainterPath(), arrow_pen, QtGui.QBrush(QtGui.QColor("#ff8c00")), ) self.target_radius = 0.08 self.target_item = self.scene.addEllipse( -self.target_radius, -self.target_radius, self.target_radius * 2, self.target_radius * 2, QtGui.QPen(QtGui.QColor("green")), QtGui.QBrush(QtGui.QColor("green")), ) self.path_load_btn.clicked.connect(self._load_path_from_text) self.path_export_btn.clicked.connect(self._export_path_to_mcu) self.origin_btn.clicked.connect(self._update_origin) self.copy_origin_btn.clicked.connect(self._copy_origin_macro) # 绑定事件 self.reset_pose_btn.clicked.connect(self._reset_position) self.port_refresh_btn.clicked.connect(self._refresh_serial_ports) self.serial_toggle_btn.clicked.connect(self._toggle_serial) self._refresh_serial_ports(initial_uart2, initial_uart5) self._set_controls_enabled(False) self._update_serial_ui() def _copy_origin_macro(self): text = self.origin_text.text().strip() if not text: QtWidgets.QMessageBox.warning(self, "复制", "原点输入框为空") return # 格式: lat_val,N,lon_val,E,alt # 例如: 3949.9120,N,11616.8544,E,47.5 parts = [p.strip() for p in text.split(",")] if len(parts) < 4: QtWidgets.QMessageBox.warning(self, "复制", "原点格式错误,需至少包含经纬度") return try: # 解析 DDMM.MMMM 格式转 DD.DDDD # 纬度 lat_raw = parts[0] lat_deg_int = int(float(lat_raw) / 100) lat_min = float(lat_raw) % 100 lat_dd = lat_deg_int + lat_min / 60.0 if parts[1].upper() == 'S': lat_dd = -lat_dd # 经度 lon_raw = parts[2] lon_deg_int = int(float(lon_raw) / 100) lon_min = float(lon_raw) % 100 lon_dd = lon_deg_int + lon_min / 60.0 if parts[3].upper() == 'W': lon_dd = -lon_dd # 高度 alt = "0.0" if len(parts) >= 5: alt = parts[4] macro = ( f"#define MC_CFG_ORIGIN_LAT_DEG ({lat_dd:.6f})\n" f"#define MC_CFG_ORIGIN_LON_DEG ({lon_dd:.6f})\n" f"#define MC_CFG_ORIGIN_ALT_M ({alt})" ) clipboard = QtWidgets.QApplication.clipboard() clipboard.setText(macro) QtWidgets.QMessageBox.information(self, "复制", "已复制宏定义到剪贴板!") except ValueError as e: QtWidgets.QMessageBox.warning(self, "复制", f"数值解析错误: {e}") def _handle_user_pan(self): if self._auto_follow: self.follow_checkbox.blockSignals(True) self.follow_checkbox.setChecked(False) self.follow_checkbox.blockSignals(False) self._auto_follow = False def _on_follow_toggled(self, checked: bool): self._auto_follow = checked if checked and self.pose_status: self.view.centerOn(self.pose_status.east, -self.pose_status.north) def _clear_trail(self): """清除地图上的轨迹点""" self.trail_points = [] # Update scene to remove trail self._update_scene() def _update_mouse_pos(self, east: float, scene_y: float): north = -scene_y self.mouse_pos_label.setText(f"E: {east: .2f} N: {north: .2f}") def _refresh_serial_ports(self, initial_uart2: Optional[str] = None, initial_uart5: Optional[str] = None): ports: List[str] = [] if list_ports: try: ports = [p.device for p in list_ports.comports()] except Exception: ports = [] saved_uart2 = initial_uart2 or self.simulator.config.uart2_port or "" saved_uart5 = initial_uart5 or self.simulator.config.uart5_port or "" def _fill(combo: QtWidgets.QComboBox, saved: str): current = combo.currentText() combo.blockSignals(True) combo.clear() if ports: combo.addItems(ports) if saved and saved not in ports: combo.addItem(saved) target = saved or current if target: idx = combo.findText(target) if idx >= 0: combo.setCurrentIndex(idx) combo.blockSignals(False) _fill(self.port2_combo, saved_uart2) _fill(self.port5_combo, saved_uart5) self._last_ports = ports def _set_controls_enabled(self, enabled: bool): self.reset_pose_btn.setEnabled(enabled) self.origin_text.setReadOnly(False) self.origin_btn.setEnabled(True) def _load_saved_path_text(self): if PATH_TEXT_FILE.exists(): try: text = PATH_TEXT_FILE.read_text(encoding="utf-8") except Exception: text = "" self.path_text.setPlainText(text) points = parse_path_text(text) if points: self.path_points = points def _update_path_text_from_points(self, points: List[Tuple[float, float]]): if not points: self.path_text.clear() return text = ";".join(f"{x:.2f},{y:.2f}" for x, y in points) + ";" self.path_text.setPlainText(text) def _save_path_text(self, text: str): try: PATH_TEXT_FILE.write_text(text.strip(), encoding="utf-8") except Exception as exc: QtWidgets.QMessageBox.warning(self, "路径", f"保存路径文本失败: {exc}") def _load_saved_pose(self): pose = self.state_store.get_pose() if pose: self.pos_spin[0].setValue(pose[0]) self.pos_spin[1].setValue(pose[1]) self.pos_spin[2].setValue(pose[2]) self.pos_spin[3].setValue(pose[3]) def _load_path_from_text(self): text = self.path_text.toPlainText().strip() points = parse_path_text(text) if not points: QtWidgets.QMessageBox.warning(self, "路径", "未解析到有效的轨迹点,请检查输入格式。") return self.path_points = points self.trail_points.clear() self._save_path_text(text) self._init_scene() self._refresh_view() QtWidgets.QMessageBox.information(self, "路径", f"已加载 {len(points)} 个路径点。") def _export_path_to_mcu(self): text = self.path_text.toPlainText().strip() points = parse_path_text(text) if not points: QtWidgets.QMessageBox.warning(self, "导出", "没有可导出的路径点,请先加载路径。") return formatted = format_path_for_mcu(points) clipboard = QtWidgets.QApplication.clipboard() clipboard.setText(formatted) try: PATH_EXPORT_FILE.write_text(formatted, encoding="utf-8") except Exception as exc: QtWidgets.QMessageBox.warning(self, "导出", f"写入文件失败: {exc}") QtWidgets.QMessageBox.information(self, "导出", "已复制到剪贴板,并写入:\n" + str(PATH_EXPORT_FILE)) def _update_serial_ui(self): self.serial_toggle_btn.setText("关闭串口" if self._serial_open else "打开串口") if self._serial_open: uart2 = self.simulator.uart2.port or "未知" uart5 = self.simulator.log_uart.port or "无" self.serial_status_label.setText(f"状态: 已连接 (UART2={uart2}, UART5={uart5})") self.serial_status_label.setStyleSheet("color: green;") else: self.serial_status_label.setText("状态: 未连接") self.serial_status_label.setStyleSheet("color: gray;") enable_ports = not self._serial_open self.port2_combo.setEnabled(enable_ports or self.port2_combo.isEditable()) self.port5_combo.setEnabled(enable_ports or self.port5_combo.isEditable()) if self._port_refresh_supported: self.port_refresh_btn.setEnabled(not self._serial_open) def _toggle_serial(self): if self._serial_open: self._close_serial() return uart2 = self.port2_combo.currentText().strip() if not uart2: QtWidgets.QMessageBox.warning(self, "串口", "请先选择 UART2 端口。") return uart5_text = self.port5_combo.currentText().strip() uart5 = uart5_text or None self.simulator.config.uart2_port = uart2 self.simulator.config.uart5_port = uart5 self.simulator.uart2.port = uart2 self.simulator.log_uart.port = uart5 try: self.simulator.start() except Exception as exc: # pragma: no cover - 依赖串口环境 self.simulator.stop() QtWidgets.QMessageBox.critical(self, "串口", f"打开串口失败: {exc}") return self._serial_open = True self.bridge.reset() self._set_controls_enabled(True) self._update_serial_ui() def _close_serial(self): self.simulator.stop() self.bridge.reset() self.trail_points.clear() self.control_status = None self.pose_status = None self.state_status = None self.stack_status.clear() self.log_view.clear() self.info_label.setText("等待数据...") self.stack_table.setRowCount(0) self._serial_open = False self._set_controls_enabled(False) self._update_serial_ui() self._refresh_view() def closeEvent(self, event): if self._serial_open: self._close_serial() super().closeEvent(event) def _update_origin(self): text = self.origin_text.text().strip() if not text: QtWidgets.QMessageBox.warning(self, "原点", "请输入坐标,格式:lat,N,lon,E[,alt]") return template = self.simulator.config.origin_gga or DEFAULT_ORIGIN_GGA try: gga = build_gga_from_short_origin(text, template) except ValueError as exc: QtWidgets.QMessageBox.warning(self, "原点", f"格式错误: {exc}") return self.simulator.update_origin(gga) self.simulator.config.origin_gga = gga self.origin_text.setText(gga_to_short_origin(gga)) self.state_store.set_origin(gga) QtWidgets.QMessageBox.information(self, "原点", "原点已更新。") def _reset_position(self): east = self.pos_spin[0].value() north = self.pos_spin[1].value() up = self.pos_spin[2].value() heading = self.pos_spin[3].value() self.simulator.reset_state(east, north, up, heading) self.state_store.set_pose(east, north, up, heading) QtWidgets.QMessageBox.information(self, "位置", "已重置仿真状态。") def _init_scene(self): if not self.path_points: self.scene.setSceneRect(-250, -250, 500, 500) return path = QtGui.QPainterPath() first = True xs = [] ys = [] for px, py in self.path_points: xs.append(px) ys.append(-py) if first: path.moveTo(px, -py) first = False else: path.lineTo(px, -py) self.path_item.setPath(path) min_x = min(xs) max_x = max(xs) min_y = min(ys) max_y = max(ys) width = max_x - min_x height = max_y - min_y pad_x = max(10.0, width * 0.75) pad_y = max(10.0, height * 0.75) scene_rect = QtCore.QRectF( min_x - pad_x, min_y - pad_y, (width if width > 0 else 1.0) + pad_x * 2.0, (height if height > 0 else 1.0) + pad_y * 2.0, ) self.scene.setSceneRect(scene_rect) self.view.fitInView(scene_rect, QtCore.Qt.KeepAspectRatio) def _append_log(self, text: str): self.log_view.appendPlainText(text) self.log_view.verticalScrollBar().setValue(self.log_view.verticalScrollBar().maximum()) def _drain_queue(self): updated = False while True: try: kind, data = self.bridge.queue.get_nowait() except queue.Empty: break if kind == "control": self.control_status = data elif kind == "pose": self.pose_status = data self.trail_points.append((data.east, data.north)) if len(self.trail_points) > 2000: self.trail_points.pop(0) elif kind == "state": self.state_status = data elif kind == "stack": self.stack_status[data.task_name] = data elif kind == "log": self._append_log(data) updated = True if updated: self._refresh_view() def _refresh_view(self): info_lines = [] if self.pose_status: pose = self.pose_status info_lines += [ "位置 (ENU, m):", f" E: {pose.east:+7.3f}", f" N: {pose.north:+7.3f}", f" U: {pose.up:+7.3f}", "", "姿态 (deg):", f" Heading: {pose.heading_deg:+7.2f}", f" Pitch : {pose.pitch_deg:+7.2f}", f" Roll : {pose.roll_deg:+7.2f}", f" Target : ({pose.target_east:+6.2f}, {pose.target_north:+6.2f})", ] if self.control_status: ctrl = self.control_status info_lines += [ "", "控制量:", f" Forward : {ctrl.forward_mps:+6.3f} m/s", f" Turn : {ctrl.turn_rate:+6.3f} rad/s", f" PWM : steer={ctrl.steering_pwm} throttle={ctrl.throttle_pwm}", f" Freq : {ctrl.freq_hz:6.2f} Hz", f" Stage : {ctrl.stage}", ] if self.state_status: st = self.state_status info_lines += [ "", f"算法状态: {st.stage}", f" XTE : {st.cross_track_error:+6.3f} m", f" Heading: {st.heading_error_deg:+6.2f} deg", ] if self.stack_status: info_lines += ["", "堆栈监测:"] for item in self.stack_status.values(): info_lines.append( f" {item.task_name}: stack={item.stack_high_water} words heap={item.heap_free_bytes} B" ) if not info_lines: info_lines = ["等待数据..."] self.info_label.setText("\n".join(info_lines)) self._update_scene_items() self._refresh_stack_table() def _refresh_stack_table(self): self.stack_table.setRowCount(len(self.stack_status)) for row, status in enumerate(self.stack_status.values()): self.stack_table.setItem(row, 0, QtWidgets.QTableWidgetItem(status.task_name)) self.stack_table.setItem(row, 1, QtWidgets.QTableWidgetItem(str(status.stack_high_water))) self.stack_table.setItem(row, 2, QtWidgets.QTableWidgetItem(str(status.heap_free_bytes))) def _update_scene_items(self): if self.trail_points: trail = QtGui.QPainterPath() first = True for x, y in self.trail_points[-1500:]: scene_y = -y if first: trail.moveTo(x, scene_y) first = False else: trail.lineTo(x, scene_y) self.trail_item.setPath(trail) if self.pose_status: pose = self.pose_status x = pose.east y = -pose.north size = self.robot_size self.robot_item.setRect(x - size, y - size, size * 2, size * 2) # 绘制航向角箭头(与车体分离的橙色指向箭头) # pose.heading_deg 为罗盘角(北=0°,顺时针为正),需转换到数学坐标(东=0°,逆时针为正) heading_rad = math.radians(90.0 - pose.heading_deg) dir_x = math.cos(heading_rad) dir_y = -math.sin(heading_rad) # 视图坐标 Y 向下,因此取反 perp_x = -dir_y perp_y = dir_x # 射线参数 offset = 0.25 # 箭头离车体中心的偏移,避免遮挡 shaft_length = 0.85 shaft_half_width = 0.07 head_length = 0.45 head_half_width = 0.28 start = ( x + dir_x * offset, y + dir_y * offset, ) shaft_end = ( start[0] + dir_x * shaft_length, start[1] + dir_y * shaft_length, ) tip = ( shaft_end[0] + dir_x * head_length, shaft_end[1] + dir_y * head_length, ) points = [ ( start[0] + perp_x * shaft_half_width, start[1] + perp_y * shaft_half_width, ), ( shaft_end[0] + perp_x * shaft_half_width, shaft_end[1] + perp_y * shaft_half_width, ), ( shaft_end[0] + perp_x * head_half_width, shaft_end[1] + perp_y * head_half_width, ), tip, ( shaft_end[0] - perp_x * head_half_width, shaft_end[1] - perp_y * head_half_width, ), ( shaft_end[0] - perp_x * shaft_half_width, shaft_end[1] - perp_y * shaft_half_width, ), ( start[0] - perp_x * shaft_half_width, start[1] - perp_y * shaft_half_width, ), ] arrow_path = QtGui.QPainterPath() arrow_path.moveTo(*points[0]) for pt in points[1:]: arrow_path.lineTo(*pt) arrow_path.closeSubpath() self.heading_item.setPath(arrow_path) target_size = self.target_radius self.target_item.setRect( pose.target_east - target_size, -pose.target_north - target_size, target_size * 2, target_size * 2, ) if self._auto_follow: self.view.centerOn(x, y) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="HITL 仿真器 GUI") parser.add_argument("--uart2", default=DEFAULT_UART2_PORT, help="STM32 UART2 串口号") parser.add_argument("--uart5", default=DEFAULT_UART5_PORT, help="STM32 UART5 串口号") parser.add_argument("--origin", default=DEFAULT_ORIGIN_GGA, help="GGA 原点") parser.add_argument("--east", type=float, default=DEFAULT_ENU[0], help="初始 E (m)") parser.add_argument("--north", type=float, default=DEFAULT_ENU[1], help="初始 N (m)") parser.add_argument("--up", type=float, default=DEFAULT_ENU[2], help="初始 U (m)") parser.add_argument("--heading", type=float, default=DEFAULT_HEADING_DEG, help="初始航向 (deg)") parser.add_argument("--gps-baud", type=int, default=DEFAULT_UART2_BAUD, help="UART2 波特率") parser.add_argument("--log-baud", type=int, default=DEFAULT_UART5_BAUD, help="UART5 波特率") parser.add_argument("--path", help="初始路径文件 (.json/.txt)") parser.add_argument("--no-gui", action="store_true", help="不启动 Qt GUI,仅打印数据") return parser.parse_args() def main(): args = parse_args() # 优先从 dashboard_state.json 加载保存的原点和初始位置 state_store = DashboardStateStore(STATE_FILE) saved_origin = state_store.get_origin() saved_pose = state_store.get_pose() origin_gga = saved_origin if saved_origin else args.origin if saved_pose: initial_enu = (saved_pose[0], saved_pose[1], saved_pose[2]) initial_heading = saved_pose[3] else: initial_enu = (args.east, args.north, args.up) initial_heading = args.heading cfg = HitlConfig( uart2_port=args.uart2, uart5_port=args.uart5, origin_gga=origin_gga, initial_enu=initial_enu, initial_heading_deg=initial_heading, gps_baudrate=args.gps_baud, log_baudrate=args.log_baud, ) run_logger = RunLogger(RUN_LOG_PATH) sim = HitlSimulator(cfg, run_logger=run_logger) bridge = HitlGuiBridge() def publish(kind, data): bridge.publish(kind, data) sim.on_control_status = lambda status: publish("control", status) sim.on_pose_status = lambda status: publish("pose", status) sim.on_state_status = lambda status: publish("state", status) sim.on_stack_status = lambda status: publish("stack", status) sim.on_log = lambda line: publish("log", line) path_points: List[Tuple[float, float]] = [] if args.path: try: path_points = load_path_points(Path(args.path)) except Exception as exc: print(f"[WARN] 路径加载失败: {exc}") if QtWidgets is None or args.no_gui: try: sim.start() print("[INFO] HITL 仿真器已启动 (无 GUI 模式)。按 Ctrl+C 退出。") while True: try: kind, data = bridge.queue.get(timeout=1.0) except queue.Empty: continue if kind == "log": print(data) elif kind == "control": print( f"[CTRL] stage={data.stage} " f"F={data.forward_mps:+.2f} m/s T={data.turn_rate:+.2f} rad/s " f"freq={data.freq_hz:.2f} Hz" ) except KeyboardInterrupt: print("\n[INFO] 用户中断。") finally: sim.stop() run_logger.close() print("[INFO] 仿真器已停止。") return app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) dashboard = HitlDashboard(sim, bridge, path_points, args.uart2, args.uart5) if args.path: dashboard.path_line.setText(args.path) dashboard.show() try: app.exec_() finally: if dashboard._serial_open: dashboard._close_serial() sim.stop() run_logger.close() print("[INFO] 仿真器已停止。") if __name__ == "__main__": sys.exit(main())