"""
|
HITL 仿真器 GUI:左侧地图显示路径/车辆,右侧显示状态与控制信息。
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
import json
|
import math
|
import queue
|
import sys
|
import threading
|
from pathlib import Path
|
from typing import Dict, List, Optional, Tuple
|
|
try:
|
from PyQt5 import QtCore, QtGui, QtWidgets
|
except Exception: # pragma: no cover - Qt 可能不存在
|
QtWidgets = None
|
|
try:
|
from serial.tools import list_ports
|
except Exception: # pragma: no cover - serial 可能不存在
|
list_ports = None
|
|
from .protocols import ControlStatus, PoseStatus, StackStatus, StateStatus, nmea_checksum
|
from .simulator import HitlConfig, HitlSimulator, RunLogger
|
|
DEFAULT_UART2_PORT = "COM11"
|
DEFAULT_UART5_PORT = "COM17"
|
DEFAULT_ORIGIN_GGA = "$GNGGA,080112.000,3949.8105069,N,11616.6876082,E,4,44,0.42,48.502,M,-6.684,M,1.0,0409*73"
|
DEFAULT_ENU = (0.0, 0.0, 0.0)
|
DEFAULT_HEADING_DEG = 0.0
|
DEFAULT_UART2_BAUD = 115200
|
DEFAULT_UART5_BAUD = 921600
|
BASE_DIR = Path(__file__).resolve().parent
|
RUN_LOG_PATH = BASE_DIR / "runlog.txt"
|
PATH_TEXT_FILE = BASE_DIR / "path_text_input.txt"
|
PATH_EXPORT_FILE = PATH_TEXT_FILE.with_suffix(".c")
|
STATE_FILE = BASE_DIR / "dashboard_state.json"
|
|
|
def load_path_points(path_file: Path) -> List[Tuple[float, float]]:
|
if not path_file.exists():
|
raise FileNotFoundError(str(path_file))
|
text = path_file.read_text(encoding="utf-8").strip()
|
if not text:
|
return []
|
points: List[Tuple[float, float]] = []
|
if path_file.suffix.lower() == ".json" or text.startswith("["):
|
try:
|
data = json.loads(text)
|
for pt in data:
|
if isinstance(pt, (list, tuple)) and len(pt) >= 2:
|
points.append((float(pt[0]), float(pt[1])))
|
return points
|
except json.JSONDecodeError:
|
return []
|
for token in text.split(";"):
|
token = token.strip()
|
if not token:
|
continue
|
parts = token.split(",")
|
if len(parts) < 2:
|
continue
|
try:
|
points.append((float(parts[0]), float(parts[1])))
|
except ValueError:
|
continue
|
return points
|
|
|
def parse_path_text(text: str) -> List[Tuple[float, float]]:
|
tokens = text.replace("\n", " ").split(";")
|
points: List[Tuple[float, float]] = []
|
for token in tokens:
|
token = token.strip()
|
if not token:
|
continue
|
parts = token.split(",")
|
if len(parts) < 2:
|
continue
|
try:
|
x = float(parts[0].strip())
|
y = float(parts[1].strip())
|
points.append((x, y))
|
except ValueError:
|
continue
|
return points
|
|
|
def format_path_for_mcu(points: List[Tuple[float, float]], decimals: int = 2) -> str:
|
if not points:
|
return "const float g_motion_path_xy[] = {};\n"
|
fmt = f"{{:.{decimals}f}}"
|
lines = ["const float g_motion_path_xy[] = {"]
|
for idx, (x, y) in enumerate(points):
|
comma = "," if idx < len(points) - 1 else ""
|
lines.append(f" {fmt.format(x)}f, {fmt.format(y)}f{comma}")
|
lines.append("};")
|
return "\n".join(lines)
|
|
|
def gga_to_short_origin(gga: str) -> str:
|
text = (gga or "").strip()
|
if not text:
|
return ""
|
body = text.split("*")[0]
|
parts = [p.strip() for p in body.split(",")]
|
if len(parts) < 6:
|
return ""
|
lat = parts[2]
|
lat_dir = parts[3]
|
lon = parts[4]
|
lon_dir = parts[5]
|
alt = parts[9] if len(parts) > 9 and parts[9] else ""
|
short = f"{lat},{lat_dir},{lon},{lon_dir}"
|
if alt:
|
short += f",{alt}"
|
return short
|
|
|
def build_gga_from_short_origin(text: str, template: str) -> str:
|
tokens = [t.strip() for t in text.replace("\n", " ").split(",") if t.strip()]
|
if len(tokens) < 4:
|
raise ValueError("原点格式应为 'lat,N,lon,E[,alt]'。")
|
lat_dm = tokens[0]
|
lat_dir = tokens[1].upper()
|
lon_dm = tokens[2]
|
lon_dir = tokens[3].upper()
|
alt = tokens[4] if len(tokens) >= 5 else None
|
if lat_dir not in ("N", "S") or lon_dir not in ("E", "W"):
|
raise ValueError("方向必须是 N/S/E/W。")
|
|
base = template or DEFAULT_ORIGIN_GGA
|
newline = ""
|
if base.endswith("\r\n"):
|
base = base[:-2]
|
newline = "\r\n"
|
body = base.split("*")[0]
|
fields = body.split(",")
|
while len(fields) <= 13:
|
fields.append("")
|
fields[0] = fields[0] or "$GNGGA"
|
fields[2] = lat_dm
|
fields[3] = lat_dir
|
fields[4] = lon_dm
|
fields[5] = lon_dir
|
if alt is not None:
|
fields[9] = alt
|
body = ",".join(fields)
|
checksum = nmea_checksum(body)
|
return f"{body}*{checksum}{newline}"
|
|
|
class HitlGuiBridge:
|
def __init__(self):
|
self.queue: queue.Queue = queue.Queue(maxsize=200)
|
self._lock = threading.Lock()
|
|
def publish(self, kind: str, data):
|
with self._lock:
|
self._enqueue((kind, data))
|
|
def reset(self):
|
with self._lock:
|
while not self.queue.empty():
|
try:
|
self.queue.get_nowait()
|
except queue.Empty:
|
break
|
|
def _enqueue(self, item):
|
try:
|
self.queue.put_nowait(item)
|
except queue.Full:
|
try:
|
self.queue.get_nowait()
|
except queue.Empty:
|
pass
|
try:
|
self.queue.put_nowait(item)
|
except queue.Full:
|
pass
|
|
|
class ZoomableGraphicsView(QtWidgets.QGraphicsView):
|
sceneMouseMoved = QtCore.pyqtSignal(float, float)
|
"""支持鼠标滚轮缩放 + 按住左键平移的视图"""
|
|
def __init__(self, *args, **kwargs):
|
super().__init__(*args, **kwargs)
|
self.setRenderHint(QtGui.QPainter.Antialiasing, True)
|
self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse)
|
self.setResizeAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse)
|
self.setDragMode(QtWidgets.QGraphicsView.NoDrag)
|
self.setCursor(QtCore.Qt.OpenHandCursor)
|
self._zoom_factor = 1.15 # 每次滚轮的缩放因子
|
self._panning = False
|
self._pan_last_pos = QtCore.QPoint()
|
self.on_user_pan = None # 由外部注入的回调
|
|
def wheelEvent(self, event: QtGui.QWheelEvent):
|
delta = event.angleDelta().y()
|
scale_factor = self._zoom_factor if delta > 0 else 1.0 / self._zoom_factor
|
self.scale(scale_factor, scale_factor)
|
|
def mousePressEvent(self, event: QtGui.QMouseEvent):
|
if event.button() == QtCore.Qt.LeftButton:
|
self._panning = True
|
self._pan_last_pos = event.pos()
|
self.setCursor(QtCore.Qt.ClosedHandCursor)
|
event.accept()
|
return
|
super().mousePressEvent(event)
|
|
def mouseMoveEvent(self, event: QtGui.QMouseEvent):
|
if self._panning:
|
scene_pos = self.mapToScene(event.pos())
|
last_scene_pos = self.mapToScene(self._pan_last_pos)
|
delta = scene_pos - last_scene_pos
|
self._pan_last_pos = event.pos()
|
self.translate(-delta.x(), -delta.y())
|
if self.on_user_pan:
|
self.on_user_pan()
|
event.accept()
|
return
|
self.sceneMouseMoved.emit(*self._scene_coords(event.pos()))
|
super().mouseMoveEvent(event)
|
|
def _scene_coords(self, view_pos: QtCore.QPoint) -> Tuple[float, float]:
|
scene_pos = self.mapToScene(view_pos)
|
return scene_pos.x(), scene_pos.y()
|
|
def mouseReleaseEvent(self, event: QtGui.QMouseEvent):
|
if event.button() == QtCore.Qt.LeftButton and self._panning:
|
self._panning = False
|
self.setCursor(QtCore.Qt.OpenHandCursor)
|
event.accept()
|
return
|
super().mouseReleaseEvent(event)
|
|
|
class DashboardStateStore:
|
def __init__(self, path: Path):
|
self.path = Path(path)
|
self.data: Dict[str, object] = {}
|
self._load()
|
|
def _load(self):
|
if not self.path.exists():
|
self.data = {}
|
return
|
try:
|
self.data = json.loads(self.path.read_text(encoding="utf-8"))
|
except Exception:
|
self.data = {}
|
|
def save(self):
|
try:
|
self.path.write_text(json.dumps(self.data, ensure_ascii=False, indent=2), encoding="utf-8")
|
except Exception:
|
pass
|
|
def get_pose(self) -> Optional[Tuple[float, float, float, float]]:
|
pose = self.data.get("pose")
|
if isinstance(pose, list) and len(pose) == 4:
|
try:
|
return tuple(float(v) for v in pose) # type: ignore
|
except (TypeError, ValueError):
|
return None
|
return None
|
|
def set_pose(self, east: float, north: float, up: float, heading: float):
|
self.data["pose"] = [east, north, up, heading]
|
self.save()
|
|
def get_origin(self) -> Optional[str]:
|
origin = self.data.get("origin")
|
if isinstance(origin, str):
|
return origin
|
return None
|
|
def set_origin(self, origin_str: str):
|
self.data["origin"] = origin_str
|
self.save()
|
|
|
class HitlDashboard(QtWidgets.QMainWindow):
|
def __init__(self,
|
simulator: HitlSimulator,
|
bridge: HitlGuiBridge,
|
path_points: List[Tuple[float, float]],
|
initial_uart2: Optional[str],
|
initial_uart5: Optional[str]):
|
super().__init__()
|
self.simulator = simulator
|
self.bridge = bridge
|
self.path_points = list(path_points or [])
|
self.trail_points: List[Tuple[float, float]] = []
|
self.control_status: Optional[ControlStatus] = None
|
self.pose_status: Optional[PoseStatus] = None
|
self.state_status: Optional[StateStatus] = None
|
self.stack_status: Dict[str, StackStatus] = {}
|
self._auto_follow = True
|
|
self.setWindowTitle("HITL 仿真状态面板")
|
self.resize(1280, 720)
|
|
self._serial_open = False
|
self._last_ports: List[str] = []
|
|
self.state_store = DashboardStateStore(STATE_FILE)
|
|
self._build_ui(initial_uart2, initial_uart5)
|
self._load_saved_path_text()
|
if not self.path_text.toPlainText().strip() and self.path_points:
|
self._update_path_text_from_points(self.path_points)
|
self._load_saved_pose()
|
self._init_scene()
|
|
self.timer = QtCore.QTimer(self)
|
self.timer.timeout.connect(self._drain_queue)
|
self.timer.start(80)
|
|
def _build_ui(self, initial_uart2: Optional[str], initial_uart5: Optional[str]):
|
central = QtWidgets.QWidget()
|
layout = QtWidgets.QHBoxLayout(central)
|
self.setCentralWidget(central)
|
|
self.scene = QtWidgets.QGraphicsScene(self)
|
left_panel = QtWidgets.QWidget()
|
left_layout = QtWidgets.QVBoxLayout(left_panel)
|
left_layout.setContentsMargins(0, 0, 0, 0)
|
left_layout.setSpacing(4)
|
|
self.view = ZoomableGraphicsView(self.scene)
|
self.view.on_user_pan = self._handle_user_pan
|
self.view.sceneMouseMoved.connect(self._update_mouse_pos)
|
left_layout.addWidget(self.view, stretch=1)
|
|
self.follow_checkbox = QtWidgets.QCheckBox("自动跟随车辆")
|
self.follow_checkbox.setChecked(True)
|
self.follow_checkbox.toggled.connect(self._on_follow_toggled)
|
left_layout.addWidget(self.follow_checkbox, alignment=QtCore.Qt.AlignLeft)
|
|
self.mouse_pos_label = QtWidgets.QLabel("E: 0.00 N: 0.00")
|
self.mouse_pos_label.setStyleSheet("color:#222; background:rgba(255,255,255,190); padding:2px 6px; border-radius:4px;")
|
left_layout.addWidget(self.mouse_pos_label, alignment=QtCore.Qt.AlignRight)
|
|
layout.addWidget(left_panel, stretch=3)
|
|
self._port_refresh_supported = list_ports is not None
|
|
right_panel = QtWidgets.QWidget()
|
right_layout = QtWidgets.QVBoxLayout(right_panel)
|
right_layout.setContentsMargins(8, 0, 0, 0)
|
layout.addWidget(right_panel, stretch=2)
|
|
# Serial configuration
|
serial_group = QtWidgets.QGroupBox("串口设置")
|
serial_layout = QtWidgets.QGridLayout(serial_group)
|
self.port2_combo = QtWidgets.QComboBox()
|
self.port5_combo = QtWidgets.QComboBox()
|
self.port_refresh_btn = QtWidgets.QPushButton("刷新")
|
self.serial_toggle_btn = QtWidgets.QPushButton("打开串口")
|
self.serial_status_label = QtWidgets.QLabel("状态: 未连接")
|
self.serial_status_label.setStyleSheet("color: gray;")
|
serial_layout.addWidget(QtWidgets.QLabel("UART2:"), 0, 0)
|
serial_layout.addWidget(self.port2_combo, 0, 1)
|
serial_layout.addWidget(QtWidgets.QLabel("UART5:"), 1, 0)
|
serial_layout.addWidget(self.port5_combo, 1, 1)
|
serial_layout.addWidget(self.port_refresh_btn, 0, 2, 2, 1)
|
serial_layout.addWidget(self.serial_toggle_btn, 2, 0, 1, 3)
|
serial_layout.addWidget(self.serial_status_label, 3, 0, 1, 3)
|
right_layout.addWidget(serial_group)
|
|
if list_ports is None:
|
self.port_refresh_btn.setEnabled(False)
|
self.port2_combo.setEditable(True)
|
self.port5_combo.setEditable(True)
|
|
# Path text input
|
path_group = QtWidgets.QGroupBox("路径轨迹")
|
path_layout = QtWidgets.QVBoxLayout(path_group)
|
self.path_text = QtWidgets.QPlainTextEdit()
|
self.path_text.setPlaceholderText("示例:23.36,-42.93;29.26,-52.99; ... ;")
|
self.path_text.setWordWrapMode(QtGui.QTextOption.NoWrap)
|
button_row = QtWidgets.QHBoxLayout()
|
self.path_load_btn = QtWidgets.QPushButton("加载到地图")
|
self.path_export_btn = QtWidgets.QPushButton("导出 MCU 格式")
|
button_row.addWidget(self.path_load_btn)
|
button_row.addWidget(self.path_export_btn)
|
path_layout.addWidget(self.path_text)
|
path_layout.addLayout(button_row)
|
right_layout.addWidget(path_group)
|
|
# Origin / pose reset
|
origin_group = QtWidgets.QGroupBox("原点/起点")
|
grid = QtWidgets.QGridLayout(origin_group)
|
default_origin_short = gga_to_short_origin(self.simulator.config.origin_gga)
|
self.origin_text = QtWidgets.QLineEdit(default_origin_short)
|
self.origin_text.setPlaceholderText("3949.9120,N,11616.8544,E[,alt]")
|
self.origin_btn = QtWidgets.QPushButton("更新原点")
|
self.copy_origin_btn = QtWidgets.QPushButton("复制宏定义") # 新增按钮
|
grid.addWidget(QtWidgets.QLabel("坐标 (lat,N,lon,E):"), 0, 0, 1, 2)
|
grid.addWidget(self.origin_text, 0, 2, 1, 2)
|
grid.addWidget(self.origin_btn, 0, 4)
|
grid.addWidget(self.copy_origin_btn, 0, 5) # 添加到布局
|
|
labels = ["E (m)", "N (m)", "U (m)", "Heading (deg)"]
|
self.pos_spin: List[QtWidgets.QDoubleSpinBox] = []
|
for idx, label in enumerate(labels):
|
spin = QtWidgets.QDoubleSpinBox()
|
if idx < 3:
|
spin.setRange(-1000.0, 1000.0)
|
spin.setDecimals(3)
|
else:
|
spin.setRange(-360.0, 360.0)
|
spin.setDecimals(2)
|
grid.addWidget(QtWidgets.QLabel(label), 1 + idx // 2, (idx % 2) * 2)
|
grid.addWidget(spin, 1 + idx // 2, (idx % 2) * 2 + 1)
|
self.pos_spin.append(spin)
|
self.reset_pose_btn = QtWidgets.QPushButton("重置位置")
|
grid.addWidget(self.reset_pose_btn, 3, 0, 1, 4)
|
right_layout.addWidget(origin_group)
|
|
# Status
|
status_group = QtWidgets.QGroupBox("车辆状态")
|
status_layout = QtWidgets.QVBoxLayout(status_group)
|
self.info_label = QtWidgets.QLabel("等待数据...")
|
self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft)
|
self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;")
|
info_scroll = QtWidgets.QScrollArea()
|
info_scroll.setWidgetResizable(True)
|
info_scroll.setWidget(self.info_label)
|
status_layout.addWidget(info_scroll)
|
right_layout.addWidget(status_group, stretch=1)
|
|
# Stack table
|
stack_group = QtWidgets.QGroupBox("堆栈监测")
|
stack_layout = QtWidgets.QVBoxLayout(stack_group)
|
self.stack_table = QtWidgets.QTableWidget(0, 3)
|
self.stack_table.setHorizontalHeaderLabels(["任务", "堆栈余量(word)", "剩余堆(byte)"])
|
self.stack_table.horizontalHeader().setStretchLastSection(True)
|
stack_layout.addWidget(self.stack_table)
|
right_layout.addWidget(stack_group)
|
|
# Log
|
log_group = QtWidgets.QGroupBox("串口日志")
|
log_layout = QtWidgets.QVBoxLayout(log_group)
|
self.log_view = QtWidgets.QPlainTextEdit()
|
self.log_view.setReadOnly(True)
|
log_layout.addWidget(self.log_view)
|
right_layout.addWidget(log_group, stretch=1)
|
|
# Scene items
|
self.path_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("gray"), 0))
|
self.trail_item = self.scene.addPath(QtGui.QPainterPath(), QtGui.QPen(QtGui.QColor("blue"), 0))
|
self.robot_size = 0.25
|
self.robot_item = self.scene.addEllipse(
|
-self.robot_size,
|
-self.robot_size,
|
self.robot_size * 2,
|
self.robot_size * 2,
|
QtGui.QPen(QtGui.QColor("#d11d29"), 0),
|
QtGui.QBrush(QtGui.QColor("#ff4b5c")),
|
)
|
arrow_pen = QtGui.QPen(QtGui.QColor("#ff8c00"), 0)
|
arrow_pen.setJoinStyle(QtCore.Qt.RoundJoin)
|
self.heading_item = self.scene.addPath(
|
QtGui.QPainterPath(),
|
arrow_pen,
|
QtGui.QBrush(QtGui.QColor("#ff8c00")),
|
)
|
self.target_radius = 0.08
|
self.target_item = self.scene.addEllipse(
|
-self.target_radius,
|
-self.target_radius,
|
self.target_radius * 2,
|
self.target_radius * 2,
|
QtGui.QPen(QtGui.QColor("green")),
|
QtGui.QBrush(QtGui.QColor("green")),
|
)
|
|
self.path_load_btn.clicked.connect(self._load_path_from_text)
|
self.path_export_btn.clicked.connect(self._export_path_to_mcu)
|
self.origin_btn.clicked.connect(self._update_origin)
|
self.copy_origin_btn.clicked.connect(self._copy_origin_macro) # 绑定事件
|
self.reset_pose_btn.clicked.connect(self._reset_position)
|
self.port_refresh_btn.clicked.connect(self._refresh_serial_ports)
|
self.serial_toggle_btn.clicked.connect(self._toggle_serial)
|
|
self._refresh_serial_ports(initial_uart2, initial_uart5)
|
self._set_controls_enabled(False)
|
self._update_serial_ui()
|
|
def _copy_origin_macro(self):
|
text = self.origin_text.text().strip()
|
if not text:
|
QtWidgets.QMessageBox.warning(self, "复制", "原点输入框为空")
|
return
|
|
# 格式: lat_val,N,lon_val,E,alt
|
# 例如: 3949.9120,N,11616.8544,E,47.5
|
parts = [p.strip() for p in text.split(",")]
|
if len(parts) < 4:
|
QtWidgets.QMessageBox.warning(self, "复制", "原点格式错误,需至少包含经纬度")
|
return
|
|
try:
|
# 解析 DDMM.MMMM 格式转 DD.DDDD
|
# 纬度
|
lat_raw = parts[0]
|
lat_deg_int = int(float(lat_raw) / 100)
|
lat_min = float(lat_raw) % 100
|
lat_dd = lat_deg_int + lat_min / 60.0
|
if parts[1].upper() == 'S':
|
lat_dd = -lat_dd
|
|
# 经度
|
lon_raw = parts[2]
|
lon_deg_int = int(float(lon_raw) / 100)
|
lon_min = float(lon_raw) % 100
|
lon_dd = lon_deg_int + lon_min / 60.0
|
if parts[3].upper() == 'W':
|
lon_dd = -lon_dd
|
|
# 高度
|
alt = "0.0"
|
if len(parts) >= 5:
|
alt = parts[4]
|
|
macro = (
|
f"#define MC_CFG_ORIGIN_LAT_DEG ({lat_dd:.6f})\n"
|
f"#define MC_CFG_ORIGIN_LON_DEG ({lon_dd:.6f})\n"
|
f"#define MC_CFG_ORIGIN_ALT_M ({alt})"
|
)
|
|
clipboard = QtWidgets.QApplication.clipboard()
|
clipboard.setText(macro)
|
QtWidgets.QMessageBox.information(self, "复制", "已复制宏定义到剪贴板!")
|
|
except ValueError as e:
|
QtWidgets.QMessageBox.warning(self, "复制", f"数值解析错误: {e}")
|
|
def _handle_user_pan(self):
|
if self._auto_follow:
|
self.follow_checkbox.blockSignals(True)
|
self.follow_checkbox.setChecked(False)
|
self.follow_checkbox.blockSignals(False)
|
self._auto_follow = False
|
|
def _on_follow_toggled(self, checked: bool):
|
self._auto_follow = checked
|
if checked and self.pose_status:
|
self.view.centerOn(self.pose_status.east, -self.pose_status.north)
|
|
def _update_mouse_pos(self, east: float, scene_y: float):
|
north = -scene_y
|
self.mouse_pos_label.setText(f"E: {east: .2f} N: {north: .2f}")
|
|
def _refresh_serial_ports(self, initial_uart2: Optional[str] = None, initial_uart5: Optional[str] = None):
|
ports: List[str] = []
|
if list_ports:
|
try:
|
ports = [p.device for p in list_ports.comports()]
|
except Exception:
|
ports = []
|
saved_uart2 = initial_uart2 or self.simulator.config.uart2_port or ""
|
saved_uart5 = initial_uart5 or self.simulator.config.uart5_port or ""
|
|
def _fill(combo: QtWidgets.QComboBox, saved: str):
|
current = combo.currentText()
|
combo.blockSignals(True)
|
combo.clear()
|
if ports:
|
combo.addItems(ports)
|
if saved and saved not in ports:
|
combo.addItem(saved)
|
target = saved or current
|
if target:
|
idx = combo.findText(target)
|
if idx >= 0:
|
combo.setCurrentIndex(idx)
|
combo.blockSignals(False)
|
|
_fill(self.port2_combo, saved_uart2)
|
_fill(self.port5_combo, saved_uart5)
|
self._last_ports = ports
|
|
def _set_controls_enabled(self, enabled: bool):
|
self.reset_pose_btn.setEnabled(enabled)
|
self.origin_text.setReadOnly(False)
|
self.origin_btn.setEnabled(True)
|
|
def _load_saved_path_text(self):
|
if PATH_TEXT_FILE.exists():
|
try:
|
text = PATH_TEXT_FILE.read_text(encoding="utf-8")
|
except Exception:
|
text = ""
|
self.path_text.setPlainText(text)
|
points = parse_path_text(text)
|
if points:
|
self.path_points = points
|
|
def _update_path_text_from_points(self, points: List[Tuple[float, float]]):
|
if not points:
|
self.path_text.clear()
|
return
|
text = ";".join(f"{x:.2f},{y:.2f}" for x, y in points) + ";"
|
self.path_text.setPlainText(text)
|
|
def _save_path_text(self, text: str):
|
try:
|
PATH_TEXT_FILE.write_text(text.strip(), encoding="utf-8")
|
except Exception as exc:
|
QtWidgets.QMessageBox.warning(self, "路径", f"保存路径文本失败: {exc}")
|
|
def _load_saved_pose(self):
|
pose = self.state_store.get_pose()
|
if pose:
|
self.pos_spin[0].setValue(pose[0])
|
self.pos_spin[1].setValue(pose[1])
|
self.pos_spin[2].setValue(pose[2])
|
self.pos_spin[3].setValue(pose[3])
|
|
def _load_path_from_text(self):
|
text = self.path_text.toPlainText().strip()
|
points = parse_path_text(text)
|
if not points:
|
QtWidgets.QMessageBox.warning(self, "路径", "未解析到有效的轨迹点,请检查输入格式。")
|
return
|
self.path_points = points
|
self.trail_points.clear()
|
self._save_path_text(text)
|
self._init_scene()
|
self._refresh_view()
|
QtWidgets.QMessageBox.information(self, "路径", f"已加载 {len(points)} 个路径点。")
|
|
def _export_path_to_mcu(self):
|
text = self.path_text.toPlainText().strip()
|
points = parse_path_text(text)
|
if not points:
|
QtWidgets.QMessageBox.warning(self, "导出", "没有可导出的路径点,请先加载路径。")
|
return
|
formatted = format_path_for_mcu(points)
|
clipboard = QtWidgets.QApplication.clipboard()
|
clipboard.setText(formatted)
|
try:
|
PATH_EXPORT_FILE.write_text(formatted, encoding="utf-8")
|
except Exception as exc:
|
QtWidgets.QMessageBox.warning(self, "导出", f"写入文件失败: {exc}")
|
QtWidgets.QMessageBox.information(self, "导出", "已复制到剪贴板,并写入:\n" + str(PATH_EXPORT_FILE))
|
|
def _update_serial_ui(self):
|
self.serial_toggle_btn.setText("关闭串口" if self._serial_open else "打开串口")
|
if self._serial_open:
|
uart2 = self.simulator.uart2.port or "未知"
|
uart5 = self.simulator.log_uart.port or "无"
|
self.serial_status_label.setText(f"状态: 已连接 (UART2={uart2}, UART5={uart5})")
|
self.serial_status_label.setStyleSheet("color: green;")
|
else:
|
self.serial_status_label.setText("状态: 未连接")
|
self.serial_status_label.setStyleSheet("color: gray;")
|
enable_ports = not self._serial_open
|
self.port2_combo.setEnabled(enable_ports or self.port2_combo.isEditable())
|
self.port5_combo.setEnabled(enable_ports or self.port5_combo.isEditable())
|
if self._port_refresh_supported:
|
self.port_refresh_btn.setEnabled(not self._serial_open)
|
|
def _toggle_serial(self):
|
if self._serial_open:
|
self._close_serial()
|
return
|
uart2 = self.port2_combo.currentText().strip()
|
if not uart2:
|
QtWidgets.QMessageBox.warning(self, "串口", "请先选择 UART2 端口。")
|
return
|
uart5_text = self.port5_combo.currentText().strip()
|
uart5 = uart5_text or None
|
self.simulator.config.uart2_port = uart2
|
self.simulator.config.uart5_port = uart5
|
self.simulator.uart2.port = uart2
|
self.simulator.log_uart.port = uart5
|
try:
|
self.simulator.start()
|
except Exception as exc: # pragma: no cover - 依赖串口环境
|
self.simulator.stop()
|
QtWidgets.QMessageBox.critical(self, "串口", f"打开串口失败: {exc}")
|
return
|
self._serial_open = True
|
self.bridge.reset()
|
self._set_controls_enabled(True)
|
self._update_serial_ui()
|
|
def _close_serial(self):
|
self.simulator.stop()
|
self.bridge.reset()
|
self.trail_points.clear()
|
self.control_status = None
|
self.pose_status = None
|
self.state_status = None
|
self.stack_status.clear()
|
self.log_view.clear()
|
self.info_label.setText("等待数据...")
|
self.stack_table.setRowCount(0)
|
self._serial_open = False
|
self._set_controls_enabled(False)
|
self._update_serial_ui()
|
self._refresh_view()
|
|
def closeEvent(self, event):
|
if self._serial_open:
|
self._close_serial()
|
super().closeEvent(event)
|
|
def _update_origin(self):
|
text = self.origin_text.text().strip()
|
if not text:
|
QtWidgets.QMessageBox.warning(self, "原点", "请输入坐标,格式:lat,N,lon,E[,alt]")
|
return
|
template = self.simulator.config.origin_gga or DEFAULT_ORIGIN_GGA
|
try:
|
gga = build_gga_from_short_origin(text, template)
|
except ValueError as exc:
|
QtWidgets.QMessageBox.warning(self, "原点", f"格式错误: {exc}")
|
return
|
self.simulator.update_origin(gga)
|
self.simulator.config.origin_gga = gga
|
self.origin_text.setText(gga_to_short_origin(gga))
|
self.state_store.set_origin(gga)
|
QtWidgets.QMessageBox.information(self, "原点", "原点已更新。")
|
|
def _reset_position(self):
|
east = self.pos_spin[0].value()
|
north = self.pos_spin[1].value()
|
up = self.pos_spin[2].value()
|
heading = self.pos_spin[3].value()
|
self.simulator.reset_state(east, north, up, heading)
|
self.state_store.set_pose(east, north, up, heading)
|
QtWidgets.QMessageBox.information(self, "位置", "已重置仿真状态。")
|
|
def _init_scene(self):
|
if not self.path_points:
|
self.scene.setSceneRect(-250, -250, 500, 500)
|
return
|
path = QtGui.QPainterPath()
|
first = True
|
xs = []
|
ys = []
|
for px, py in self.path_points:
|
xs.append(px)
|
ys.append(-py)
|
if first:
|
path.moveTo(px, -py)
|
first = False
|
else:
|
path.lineTo(px, -py)
|
self.path_item.setPath(path)
|
min_x = min(xs)
|
max_x = max(xs)
|
min_y = min(ys)
|
max_y = max(ys)
|
width = max_x - min_x
|
height = max_y - min_y
|
pad_x = max(10.0, width * 0.75)
|
pad_y = max(10.0, height * 0.75)
|
scene_rect = QtCore.QRectF(
|
min_x - pad_x,
|
min_y - pad_y,
|
(width if width > 0 else 1.0) + pad_x * 2.0,
|
(height if height > 0 else 1.0) + pad_y * 2.0,
|
)
|
self.scene.setSceneRect(scene_rect)
|
|
def _append_log(self, text: str):
|
self.log_view.appendPlainText(text)
|
self.log_view.verticalScrollBar().setValue(self.log_view.verticalScrollBar().maximum())
|
|
def _drain_queue(self):
|
updated = False
|
while True:
|
try:
|
kind, data = self.bridge.queue.get_nowait()
|
except queue.Empty:
|
break
|
if kind == "control":
|
self.control_status = data
|
elif kind == "pose":
|
self.pose_status = data
|
self.trail_points.append((data.east, data.north))
|
if len(self.trail_points) > 2000:
|
self.trail_points.pop(0)
|
elif kind == "state":
|
self.state_status = data
|
elif kind == "stack":
|
self.stack_status[data.task_name] = data
|
elif kind == "log":
|
self._append_log(data)
|
updated = True
|
if updated:
|
self._refresh_view()
|
|
def _refresh_view(self):
|
info_lines = []
|
if self.pose_status:
|
pose = self.pose_status
|
info_lines += [
|
"位置 (ENU, m):",
|
f" E: {pose.east:+7.3f}",
|
f" N: {pose.north:+7.3f}",
|
f" U: {pose.up:+7.3f}",
|
"",
|
"姿态 (deg):",
|
f" Heading: {pose.heading_deg:+7.2f}",
|
f" Pitch : {pose.pitch_deg:+7.2f}",
|
f" Roll : {pose.roll_deg:+7.2f}",
|
f" Target : ({pose.target_east:+6.2f}, {pose.target_north:+6.2f})",
|
]
|
if self.control_status:
|
ctrl = self.control_status
|
info_lines += [
|
"",
|
"控制量:",
|
f" Forward : {ctrl.forward_mps:+6.3f} m/s",
|
f" Turn : {ctrl.turn_rate:+6.3f} rad/s",
|
f" PWM : steer={ctrl.steering_pwm} throttle={ctrl.throttle_pwm}",
|
f" Freq : {ctrl.freq_hz:6.2f} Hz",
|
f" Stage : {ctrl.stage}",
|
]
|
if self.state_status:
|
st = self.state_status
|
info_lines += [
|
"",
|
f"算法状态: {st.stage}",
|
f" XTE : {st.cross_track_error:+6.3f} m",
|
f" Heading: {st.heading_error_deg:+6.2f} deg",
|
]
|
if self.stack_status:
|
info_lines += ["", "堆栈监测:"]
|
for item in self.stack_status.values():
|
info_lines.append(
|
f" {item.task_name}: stack={item.stack_high_water} words heap={item.heap_free_bytes} B"
|
)
|
if not info_lines:
|
info_lines = ["等待数据..."]
|
self.info_label.setText("\n".join(info_lines))
|
self._update_scene_items()
|
self._refresh_stack_table()
|
|
def _refresh_stack_table(self):
|
self.stack_table.setRowCount(len(self.stack_status))
|
for row, status in enumerate(self.stack_status.values()):
|
self.stack_table.setItem(row, 0, QtWidgets.QTableWidgetItem(status.task_name))
|
self.stack_table.setItem(row, 1, QtWidgets.QTableWidgetItem(str(status.stack_high_water)))
|
self.stack_table.setItem(row, 2, QtWidgets.QTableWidgetItem(str(status.heap_free_bytes)))
|
|
def _update_scene_items(self):
|
if self.trail_points:
|
trail = QtGui.QPainterPath()
|
first = True
|
for x, y in self.trail_points[-1500:]:
|
scene_y = -y
|
if first:
|
trail.moveTo(x, scene_y)
|
first = False
|
else:
|
trail.lineTo(x, scene_y)
|
self.trail_item.setPath(trail)
|
if self.pose_status:
|
pose = self.pose_status
|
x = pose.east
|
y = -pose.north
|
size = self.robot_size
|
self.robot_item.setRect(x - size, y - size, size * 2, size * 2)
|
|
# 绘制航向角箭头(与车体分离的橙色指向箭头)
|
# pose.heading_deg 为罗盘角(北=0°,顺时针为正),需转换到数学坐标(东=0°,逆时针为正)
|
heading_rad = math.radians(90.0 - pose.heading_deg)
|
dir_x = math.cos(heading_rad)
|
dir_y = -math.sin(heading_rad) # 视图坐标 Y 向下,因此取反
|
perp_x = -dir_y
|
perp_y = dir_x
|
|
# 射线参数
|
offset = 0.25 # 箭头离车体中心的偏移,避免遮挡
|
shaft_length = 0.85
|
shaft_half_width = 0.07
|
head_length = 0.45
|
head_half_width = 0.28
|
|
start = (
|
x + dir_x * offset,
|
y + dir_y * offset,
|
)
|
shaft_end = (
|
start[0] + dir_x * shaft_length,
|
start[1] + dir_y * shaft_length,
|
)
|
tip = (
|
shaft_end[0] + dir_x * head_length,
|
shaft_end[1] + dir_y * head_length,
|
)
|
|
points = [
|
(
|
start[0] + perp_x * shaft_half_width,
|
start[1] + perp_y * shaft_half_width,
|
),
|
(
|
shaft_end[0] + perp_x * shaft_half_width,
|
shaft_end[1] + perp_y * shaft_half_width,
|
),
|
(
|
shaft_end[0] + perp_x * head_half_width,
|
shaft_end[1] + perp_y * head_half_width,
|
),
|
tip,
|
(
|
shaft_end[0] - perp_x * head_half_width,
|
shaft_end[1] - perp_y * head_half_width,
|
),
|
(
|
shaft_end[0] - perp_x * shaft_half_width,
|
shaft_end[1] - perp_y * shaft_half_width,
|
),
|
(
|
start[0] - perp_x * shaft_half_width,
|
start[1] - perp_y * shaft_half_width,
|
),
|
]
|
|
arrow_path = QtGui.QPainterPath()
|
arrow_path.moveTo(*points[0])
|
for pt in points[1:]:
|
arrow_path.lineTo(*pt)
|
arrow_path.closeSubpath()
|
|
self.heading_item.setPath(arrow_path)
|
target_size = self.target_radius
|
self.target_item.setRect(
|
pose.target_east - target_size,
|
-pose.target_north - target_size,
|
target_size * 2,
|
target_size * 2,
|
)
|
if self._auto_follow:
|
self.view.centerOn(x, y)
|
|
|
def parse_args() -> argparse.Namespace:
|
parser = argparse.ArgumentParser(description="HITL 仿真器 GUI")
|
parser.add_argument("--uart2", default=DEFAULT_UART2_PORT, help="STM32 UART2 串口号")
|
parser.add_argument("--uart5", default=DEFAULT_UART5_PORT, help="STM32 UART5 串口号")
|
parser.add_argument("--origin", default=DEFAULT_ORIGIN_GGA, help="GGA 原点")
|
parser.add_argument("--east", type=float, default=DEFAULT_ENU[0], help="初始 E (m)")
|
parser.add_argument("--north", type=float, default=DEFAULT_ENU[1], help="初始 N (m)")
|
parser.add_argument("--up", type=float, default=DEFAULT_ENU[2], help="初始 U (m)")
|
parser.add_argument("--heading", type=float, default=DEFAULT_HEADING_DEG, help="初始航向 (deg)")
|
parser.add_argument("--gps-baud", type=int, default=DEFAULT_UART2_BAUD, help="UART2 波特率")
|
parser.add_argument("--log-baud", type=int, default=DEFAULT_UART5_BAUD, help="UART5 波特率")
|
parser.add_argument("--path", help="初始路径文件 (.json/.txt)")
|
parser.add_argument("--no-gui", action="store_true", help="不启动 Qt GUI,仅打印数据")
|
return parser.parse_args()
|
|
|
def main():
|
args = parse_args()
|
|
# 优先从 dashboard_state.json 加载保存的原点和初始位置
|
state_store = DashboardStateStore(STATE_FILE)
|
saved_origin = state_store.get_origin()
|
saved_pose = state_store.get_pose()
|
|
origin_gga = saved_origin if saved_origin else args.origin
|
if saved_pose:
|
initial_enu = (saved_pose[0], saved_pose[1], saved_pose[2])
|
initial_heading = saved_pose[3]
|
else:
|
initial_enu = (args.east, args.north, args.up)
|
initial_heading = args.heading
|
|
cfg = HitlConfig(
|
uart2_port=args.uart2,
|
uart5_port=args.uart5,
|
origin_gga=origin_gga,
|
initial_enu=initial_enu,
|
initial_heading_deg=initial_heading,
|
gps_baudrate=args.gps_baud,
|
log_baudrate=args.log_baud,
|
)
|
|
run_logger = RunLogger(RUN_LOG_PATH)
|
sim = HitlSimulator(cfg, run_logger=run_logger)
|
bridge = HitlGuiBridge()
|
|
def publish(kind, data):
|
bridge.publish(kind, data)
|
|
sim.on_control_status = lambda status: publish("control", status)
|
sim.on_pose_status = lambda status: publish("pose", status)
|
sim.on_state_status = lambda status: publish("state", status)
|
sim.on_stack_status = lambda status: publish("stack", status)
|
sim.on_log = lambda line: publish("log", line)
|
|
path_points: List[Tuple[float, float]] = []
|
if args.path:
|
try:
|
path_points = load_path_points(Path(args.path))
|
except Exception as exc:
|
print(f"[WARN] 路径加载失败: {exc}")
|
|
if QtWidgets is None or args.no_gui:
|
try:
|
sim.start()
|
print("[INFO] HITL 仿真器已启动 (无 GUI 模式)。按 Ctrl+C 退出。")
|
while True:
|
try:
|
kind, data = bridge.queue.get(timeout=1.0)
|
except queue.Empty:
|
continue
|
if kind == "log":
|
print(data)
|
elif kind == "control":
|
print(
|
f"[CTRL] stage={data.stage} "
|
f"F={data.forward_mps:+.2f} m/s T={data.turn_rate:+.2f} rad/s "
|
f"freq={data.freq_hz:.2f} Hz"
|
)
|
except KeyboardInterrupt:
|
print("\n[INFO] 用户中断。")
|
finally:
|
sim.stop()
|
run_logger.close()
|
print("[INFO] 仿真器已停止。")
|
return
|
|
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([])
|
dashboard = HitlDashboard(sim, bridge, path_points, args.uart2, args.uart5)
|
if args.path:
|
dashboard.path_line.setText(args.path)
|
dashboard.show()
|
try:
|
app.exec_()
|
finally:
|
if dashboard._serial_open:
|
dashboard._close_serial()
|
sim.stop()
|
run_logger.close()
|
print("[INFO] 仿真器已停止。")
|
|
|
if __name__ == "__main__":
|
sys.exit(main())
|