| | |
| | | import time |
| | | import json |
| | | import math |
| | | from typing import Optional, Tuple |
| | | import threading |
| | | import queue |
| | | from pathlib import Path |
| | | from typing import Callable, Dict, Optional, Tuple |
| | | |
| | | from serial.tools import list_ports |
| | | from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData |
| | | from mower_controller import MowerController, wrap_angle |
| | | |
| | | try: |
| | | from PyQt5 import QtCore, QtGui, QtWidgets |
| | | except Exception: |
| | | QtWidgets = None |
| | | |
| | | BASE_DIR = Path(__file__).resolve().parent |
| | | GUI_STATE_FILE = BASE_DIR / "gui_state.json" |
| | | GUI_STATE_DEFAULT = { |
| | | "path_file": "", |
| | | "origin_gga": "", |
| | | "serial_port": "", |
| | | } |
| | | |
| | | |
| | | def load_gui_state() -> Dict[str, str]: |
| | | state = dict(GUI_STATE_DEFAULT) |
| | | try: |
| | | if GUI_STATE_FILE.exists(): |
| | | with GUI_STATE_FILE.open('r', encoding='utf-8') as f: |
| | | data = json.load(f) |
| | | if isinstance(data, dict): |
| | | state.update(data) |
| | | except Exception as exc: |
| | | print(f"[WARN] 读取 GUI 配置失败: {exc}") |
| | | return state |
| | | |
| | | |
| | | def save_gui_state(state: Dict[str, str]) -> None: |
| | | try: |
| | | GUI_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) |
| | | with GUI_STATE_FILE.open('w', encoding='utf-8') as f: |
| | | json.dump(state, f, ensure_ascii=False, indent=2) |
| | | except Exception as exc: |
| | | print(f"[WARN] 保存 GUI 配置失败: {exc}") |
| | | |
| | | |
| | | EARTH_RADIUS_M = 6378137.0 |
| | | WGS84_A = 6378137.0 # 半长轴 |
| | |
| | | return east, north, up |
| | | |
| | | |
| | | if QtWidgets: |
| | | class ZoomableGraphicsView(QtWidgets.QGraphicsView): |
| | | """支持滚轮缩放与拖拽的视图""" |
| | | |
| | | def __init__(self, scene, parent=None): |
| | | super().__init__(scene, parent) |
| | | self.setRenderHint(QtGui.QPainter.Antialiasing, True) |
| | | self.setDragMode(QtWidgets.QGraphicsView.ScrollHandDrag) |
| | | self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) |
| | | self.setResizeAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) |
| | | self.setViewportUpdateMode(QtWidgets.QGraphicsView.SmartViewportUpdate) |
| | | self._zoom = 0 |
| | | |
| | | def wheelEvent(self, event: QtGui.QWheelEvent): |
| | | zoom_in_factor = 1.15 |
| | | zoom_out_factor = 1 / zoom_in_factor |
| | | |
| | | if event.angleDelta().y() > 0: |
| | | factor = zoom_in_factor |
| | | self._zoom += 1 |
| | | else: |
| | | factor = zoom_out_factor |
| | | self._zoom -= 1 |
| | | |
| | | if self._zoom > 50: |
| | | self._zoom = 50 |
| | | return |
| | | if self._zoom < -50: |
| | | self._zoom = -50 |
| | | return |
| | | |
| | | self.scale(factor, factor) |
| | | |
| | | def reset_zoom(self): |
| | | self._zoom = 0 |
| | | self.resetTransform() |
| | | |
| | | |
| | | class QtRealtimeDashboard(QtWidgets.QMainWindow): |
| | | """主线程中的 Qt 状态面板,定时从队列取数据刷新""" |
| | | |
| | | def __init__( |
| | | self, |
| | | controller, |
| | | data_queue: queue.Queue, |
| | | gui_state: Dict[str, str], |
| | | persist_state: Callable[[Dict[str, str]], None], |
| | | ): |
| | | super().__init__() |
| | | self.controller = controller |
| | | self.data_queue = data_queue |
| | | self.gui_state = gui_state or {} |
| | | self.persist_state_cb = persist_state |
| | | self.control_thread: Optional[threading.Thread] = None |
| | | self._last_gui_consume_log = 0.0 |
| | | self._fit_applied = False |
| | | self._path_rect = QtCore.QRectF() |
| | | self.path_points = list(controller.path_points or []) |
| | | |
| | | self.setWindowTitle("Lawnmower 运动控制调试界面") |
| | | self.resize(1200, 680) |
| | | |
| | | self._build_ui() |
| | | self.set_path_points(self.path_points, refit=True) |
| | | self._refresh_serial_ports(initial=True) |
| | | self._apply_saved_values() |
| | | self._update_serial_ui() |
| | | self._update_control_buttons() |
| | | |
| | | self.timer = QtCore.QTimer() |
| | | self.timer.timeout.connect(self._drain_queue) |
| | | self.timer.start(80) # ~12.5Hz 刷新 |
| | | |
| | | def _build_ui(self): |
| | | central = QtWidgets.QWidget() |
| | | layout = QtWidgets.QHBoxLayout(central) |
| | | self.setCentralWidget(central) |
| | | |
| | | self.scene = QtWidgets.QGraphicsScene() |
| | | self.view = ZoomableGraphicsView(self.scene) |
| | | layout.addWidget(self.view, stretch=3) |
| | | |
| | | self.path_pen = QtGui.QPen(QtGui.QColor("gray")) |
| | | self.path_pen.setStyle(QtCore.Qt.DashLine) |
| | | self.path_pen.setWidthF(0.8) |
| | | self.path_pen.setCosmetic(True) |
| | | self.path_item = self.scene.addPath(QtGui.QPainterPath(), self.path_pen) |
| | | |
| | | trail_pen = QtGui.QPen(QtGui.QColor("blue")) |
| | | trail_pen.setWidthF(1.0) |
| | | trail_pen.setCosmetic(True) |
| | | self.trail_path_item = self.scene.addPath(QtGui.QPainterPath(), trail_pen) |
| | | |
| | | self.heading_item = self.scene.addLine(0, 0, 0, 0, QtGui.QPen(QtGui.QColor("red"), 2)) |
| | | robot_pen = QtGui.QPen(QtGui.QColor("red")) |
| | | robot_brush = QtGui.QBrush(QtGui.QColor("red")) |
| | | self.robot_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, robot_pen, robot_brush) |
| | | target_pen = QtGui.QPen(QtGui.QColor("green")) |
| | | target_brush = QtGui.QBrush(QtGui.QColor("green")) |
| | | self.target_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, target_pen, target_brush) |
| | | |
| | | right_panel = QtWidgets.QWidget() |
| | | right_layout = QtWidgets.QVBoxLayout(right_panel) |
| | | right_layout.setContentsMargins(8, 0, 0, 0) |
| | | layout.addWidget(right_panel, stretch=2) |
| | | |
| | | path_group = QtWidgets.QGroupBox("路径文件") |
| | | path_layout = QtWidgets.QVBoxLayout(path_group) |
| | | path_row = QtWidgets.QHBoxLayout() |
| | | self.path_line = QtWidgets.QLineEdit() |
| | | self.path_browse_btn = QtWidgets.QPushButton("浏览") |
| | | self.path_load_btn = QtWidgets.QPushButton("加载") |
| | | path_row.addWidget(self.path_line, stretch=1) |
| | | path_row.addWidget(self.path_browse_btn) |
| | | path_layout.addLayout(path_row) |
| | | path_layout.addWidget(self.path_load_btn, alignment=QtCore.Qt.AlignRight) |
| | | right_layout.addWidget(path_group) |
| | | |
| | | origin_group = QtWidgets.QGroupBox("原点 (GGA)") |
| | | origin_layout = QtWidgets.QVBoxLayout(origin_group) |
| | | self.origin_input = QtWidgets.QLineEdit() |
| | | self.origin_update_btn = QtWidgets.QPushButton("更新原点") |
| | | origin_layout.addWidget(self.origin_input) |
| | | origin_layout.addWidget(self.origin_update_btn, alignment=QtCore.Qt.AlignRight) |
| | | right_layout.addWidget(origin_group) |
| | | |
| | | serial_group = QtWidgets.QGroupBox("串口设置") |
| | | serial_layout = QtWidgets.QVBoxLayout(serial_group) |
| | | serial_row = QtWidgets.QHBoxLayout() |
| | | self.port_combo = QtWidgets.QComboBox() |
| | | self.port_refresh_btn = QtWidgets.QPushButton("刷新") |
| | | serial_row.addWidget(self.port_combo, stretch=1) |
| | | serial_row.addWidget(self.port_refresh_btn) |
| | | serial_layout.addLayout(serial_row) |
| | | self.serial_toggle_btn = QtWidgets.QPushButton("打开串口") |
| | | serial_layout.addWidget(self.serial_toggle_btn) |
| | | right_layout.addWidget(serial_group) |
| | | |
| | | control_group = QtWidgets.QGroupBox("运动控制") |
| | | control_layout = QtWidgets.QHBoxLayout(control_group) |
| | | self.start_button = QtWidgets.QPushButton("开始") |
| | | self.stop_button = QtWidgets.QPushButton("停止") |
| | | self.stop_button.setEnabled(False) |
| | | control_layout.addWidget(self.start_button) |
| | | control_layout.addWidget(self.stop_button) |
| | | right_layout.addWidget(control_group) |
| | | |
| | | follow_row = QtWidgets.QHBoxLayout() |
| | | self.follow_checkbox = QtWidgets.QCheckBox("跟随车辆") |
| | | self.follow_checkbox.setChecked(True) |
| | | self.freq_label = QtWidgets.QLabel("控制频率: --.- Hz") |
| | | self.freq_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;") |
| | | follow_row.addWidget(self.follow_checkbox) |
| | | follow_row.addWidget(self.freq_label) |
| | | follow_row.addStretch() |
| | | right_layout.addLayout(follow_row) |
| | | |
| | | self.info_label = QtWidgets.QLabel() |
| | | self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) |
| | | self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;") |
| | | info_scroll = QtWidgets.QScrollArea() |
| | | info_scroll.setWidgetResizable(True) |
| | | info_scroll.setWidget(self.info_label) |
| | | info_scroll.setMinimumWidth(320) |
| | | right_layout.addWidget(info_scroll, stretch=1) |
| | | |
| | | if self.controller is not None: |
| | | self.path_browse_btn.clicked.connect(self._browse_path_file) |
| | | self.path_load_btn.clicked.connect(self._load_path_from_ui) |
| | | self.origin_update_btn.clicked.connect(self._update_origin_from_ui) |
| | | self.port_refresh_btn.clicked.connect(lambda: self._refresh_serial_ports(initial=False)) |
| | | self.serial_toggle_btn.clicked.connect(self._toggle_serial_connection) |
| | | self.start_button.clicked.connect(self._start_control) |
| | | self.stop_button.clicked.connect(self._stop_control) |
| | | |
| | | def _apply_saved_values(self): |
| | | path_file = self.gui_state.get("path_file", "") |
| | | if path_file: |
| | | self.path_line.setText(path_file) |
| | | origin = self.gui_state.get("origin_gga", "") |
| | | if origin: |
| | | self.origin_input.setText(origin) |
| | | saved_port = self.gui_state.get("serial_port", "") |
| | | if saved_port: |
| | | index = self.port_combo.findText(saved_port) |
| | | if index >= 0: |
| | | self.port_combo.setCurrentIndex(index) |
| | | |
| | | def set_path_points(self, path_points, refit=False): |
| | | self._init_path(path_points or []) |
| | | if refit: |
| | | self._fit_applied = False |
| | | self._ensure_initial_view() |
| | | |
| | | def _init_path(self, path_points): |
| | | self.path_points = list(path_points or []) |
| | | painter_path = QtGui.QPainterPath() |
| | | if not self.path_points: |
| | | self.path_item.setPath(painter_path) |
| | | self.scene.setSceneRect(-10, -10, 20, 20) |
| | | self._path_rect = self.scene.sceneRect() |
| | | return |
| | | |
| | | first = True |
| | | for px, py in self.path_points: |
| | | py = -py |
| | | if first: |
| | | painter_path.moveTo(px, py) |
| | | first = False |
| | | else: |
| | | painter_path.lineTo(px, py) |
| | | self.path_item.setPath(painter_path) |
| | | |
| | | xs = [p[0] for p in self.path_points] |
| | | ys = [-p[1] for p in self.path_points] |
| | | margin = 2.0 |
| | | min_x, max_x = min(xs), max(xs) |
| | | min_y, max_y = min(ys), max(ys) |
| | | if min_x == max_x: |
| | | min_x -= 1 |
| | | max_x += 1 |
| | | if min_y == max_y: |
| | | min_y -= 1 |
| | | max_y += 1 |
| | | self.scene.setSceneRect(min_x - margin, min_y - margin, (max_x - min_x) + 2 * margin, (max_y - min_y) + 2 * margin) |
| | | self._path_rect = self.scene.sceneRect() |
| | | |
| | | def _drain_queue(self): |
| | | updated = False |
| | | processed = 0 |
| | | while True: |
| | | try: |
| | | data = self.data_queue.get_nowait() |
| | | except queue.Empty: |
| | | break |
| | | self._apply_update(data) |
| | | updated = True |
| | | processed += 1 |
| | | if updated: |
| | | self.data_queue.task_done() |
| | | now = time.time() |
| | | if now - self._last_gui_consume_log > 1.0: |
| | | print(f"[GUI] Qt面板刷新 {processed} 条数据,队列剩余 {self.data_queue.qsize()}") |
| | | self._last_gui_consume_log = now |
| | | |
| | | def _apply_update(self, data): |
| | | pos_xyz = data["pos_xyz"] |
| | | heading_deg = data["heading_deg"] |
| | | trail_points = data["trail"] |
| | | target_point = data["target"] |
| | | info_str = data["info_str"] |
| | | control_freq = data.get("control_freq_hz") |
| | | |
| | | path = QtGui.QPainterPath() |
| | | first = True |
| | | for px, py, _pz in trail_points[-1500:]: |
| | | py = -py |
| | | if first: |
| | | path.moveTo(px, py) |
| | | first = False |
| | | else: |
| | | path.lineTo(px, py) |
| | | self.trail_path_item.setPath(path) |
| | | |
| | | x = pos_xyz[0] |
| | | y = -pos_xyz[1] |
| | | self.robot_item.setRect(x - 0.3, y - 0.3, 0.6, 0.6) |
| | | |
| | | arrow_len = 1.0 |
| | | heading_rad = math.radians(heading_deg) |
| | | dx = arrow_len * math.cos(heading_rad) |
| | | dy = arrow_len * math.sin(heading_rad) |
| | | self.heading_item.setLine(x, y, x + dx, y - dy) |
| | | |
| | | if target_point: |
| | | tx = target_point[0] |
| | | ty = -target_point[1] |
| | | self.target_item.setRect(tx - 0.3, ty - 0.3, 0.6, 0.6) |
| | | else: |
| | | self.target_item.setRect(0, 0, 0, 0) |
| | | |
| | | self.info_label.setText(info_str) |
| | | if self.follow_checkbox.isChecked(): |
| | | self._center_on_robot(x, y) |
| | | if control_freq is not None: |
| | | self.freq_label.setText(f"控制频率: {control_freq:5.1f} Hz") |
| | | |
| | | def _ensure_initial_view(self): |
| | | if not self._fit_applied and not self._path_rect.isNull(): |
| | | self.view.reset_zoom() |
| | | self.view.fitInView(self._path_rect, QtCore.Qt.KeepAspectRatio) |
| | | self._fit_applied = True |
| | | |
| | | def _center_on_robot(self, scene_x, scene_y): |
| | | self.view.centerOn(scene_x, scene_y) |
| | | |
| | | def _browse_path_file(self): |
| | | current = self.path_line.text().strip() |
| | | start_dir = Path(current).parent if current else BASE_DIR |
| | | file_name, _ = QtWidgets.QFileDialog.getOpenFileName( |
| | | self, |
| | | "选择路径文件", |
| | | str(start_dir), |
| | | "路径文件 (*.txt *.json);;所有文件 (*)", |
| | | ) |
| | | if file_name: |
| | | self.path_line.setText(file_name) |
| | | |
| | | def _load_path_from_ui(self): |
| | | if self.controller.is_running(): |
| | | QtWidgets.QMessageBox.warning(self, "路径", "请先停止运动控制,再加载路径。") |
| | | return |
| | | path_file = self.path_line.text().strip() |
| | | if not path_file: |
| | | QtWidgets.QMessageBox.warning(self, "路径", "请先选择路径文件。") |
| | | return |
| | | if not Path(path_file).exists(): |
| | | QtWidgets.QMessageBox.warning(self, "路径", "路径文件不存在。") |
| | | return |
| | | if self.controller.load_path(path_file): |
| | | self.set_path_points(self.controller.path_points, refit=True) |
| | | self._save_state({"path_file": path_file}) |
| | | QtWidgets.QMessageBox.information(self, "路径", "路径加载成功。") |
| | | self._update_control_buttons() |
| | | else: |
| | | QtWidgets.QMessageBox.warning(self, "路径", "路径加载失败,请检查文件内容。") |
| | | |
| | | def _update_origin_from_ui(self): |
| | | if self.controller.is_running(): |
| | | QtWidgets.QMessageBox.warning(self, "原点", "请先停止运动控制,再更新原点。") |
| | | return |
| | | origin = self.origin_input.text().strip() |
| | | try: |
| | | self.controller.set_origin_geo(origin) |
| | | except ValueError as exc: |
| | | QtWidgets.QMessageBox.warning(self, "原点", f"原点解析失败: {exc}") |
| | | return |
| | | self._save_state({"origin_gga": origin}) |
| | | QtWidgets.QMessageBox.information(self, "原点", "原点已更新。") |
| | | |
| | | def _refresh_serial_ports(self, initial: bool = False): |
| | | ports = [p.device for p in list_ports.comports()] |
| | | saved_port = self.gui_state.get("serial_port", "") |
| | | if initial and saved_port and saved_port not in ports: |
| | | ports.append(saved_port) |
| | | current = self.port_combo.currentText() |
| | | self.port_combo.blockSignals(True) |
| | | self.port_combo.clear() |
| | | self.port_combo.addItems(ports) |
| | | target = saved_port or current |
| | | if target: |
| | | index = self.port_combo.findText(target) |
| | | if index >= 0: |
| | | self.port_combo.setCurrentIndex(index) |
| | | self.port_combo.blockSignals(False) |
| | | |
| | | def _toggle_serial_connection(self): |
| | | QtWidgets.QMessageBox.warning(self, "串口", "HITL 模式下该操作由仿真器控制") |
| | | |
| | | def _start_control(self): |
| | | QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器启动") |
| | | |
| | | def _stop_control(self): |
| | | QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器停止") |
| | | |
| | | def _is_control_running(self) -> bool: |
| | | return False |
| | | |
| | | def _save_state(self, updates: Dict[str, str]): |
| | | self.gui_state.update(updates) |
| | | if self.persist_state_cb: |
| | | self.persist_state_cb(updates) |
| | | |
| | | def _update_serial_ui(self): |
| | | self.serial_toggle_btn.setEnabled(False) |
| | | self.port_combo.setEnabled(False) |
| | | self.port_refresh_btn.setEnabled(False) |
| | | |
| | | def _update_control_buttons(self): |
| | | self.start_button.setEnabled(False) |
| | | self.stop_button.setEnabled(False) |
| | | |
| | | @QtCore.pyqtSlot() |
| | | def _on_control_finished(self): |
| | | pass |
| | | |
| | | @QtCore.pyqtSlot(str) |
| | | def _show_error_message(self, message: str): |
| | | QtWidgets.QMessageBox.warning(self, "错误", message) |
| | | |
| | | def closeEvent(self, event): |
| | | super().closeEvent(event) |
| | | else: |
| | | QtRealtimeDashboard = None |
| | | |
| | | |
| | | class GuiBridge: |
| | | """控制线程向 Qt 主线程发送数据的桥接器""" |
| | | |
| | | def __init__(self): |
| | | self.queue: queue.Queue = queue.Queue(maxsize=50) |
| | | self._last_publish_log = 0.0 |
| | | |
| | | def publish(self, data): |
| | | try: |
| | | self.queue.put_nowait(data) |
| | | except queue.Full: |
| | | try: |
| | | self.queue.get_nowait() |
| | | except queue.Empty: |
| | | pass |
| | | try: |
| | | self.queue.put_nowait(data) |
| | | except queue.Full: |
| | | pass |
| | | now = time.time() |
| | | if now - self._last_publish_log > 1.0: |
| | | print(f"[GUI] 发布数据到队列,当前长度 {self.queue.qsize()},显示位置 {data.get('pos_xyz')}") |
| | | self._last_publish_log = now |
| | | |
| | | |
| | | class ControlCommandSender: |
| | | """控制命令发送器 - 发送转向和油门控制信号给STM32""" |
| | | |
| | |
| | | def __init__(self, port: str, baudrate: int = 921600, |
| | | offset_xyz: Tuple[float, float, float] = (0.0, 0.0, 0.0), |
| | | origin_geo: Optional[str] = None, |
| | | origin_alt_m: Optional[float] = None): |
| | | origin_alt_m: Optional[float] = None, |
| | | gui_bridge: Optional[GuiBridge] = None): |
| | | """ |
| | | 初始化实时控制器 |
| | | |
| | |
| | | self.port = port |
| | | self.baudrate = baudrate |
| | | self.offset_xyz = offset_xyz |
| | | self.origin_geo = origin_geo or "" |
| | | self.origin_latlon: Optional[Tuple[float, float]] = None |
| | | self.origin_altitude: Optional[float] = origin_alt_m |
| | | self.origin_lat_rad: Optional[float] = None |
| | | self.origin_lon_rad: Optional[float] = None |
| | | self.origin_ecef: Optional[Tuple[float, float, float]] = None |
| | | |
| | | if origin_geo: |
| | | try: |
| | | lat, lon, alt = _parse_origin_geo(origin_geo) |
| | | self.origin_latlon = (lat, lon) |
| | | # 如果解析出高度,且未手动指定,则优先使用解析出的高度 |
| | | if alt is not None and self.origin_altitude is None: |
| | | self.origin_altitude = alt |
| | | print(f"[INFO] 从原点配置中解析出高度: {self.origin_altitude:.3f} m") |
| | | |
| | | print(f"[INFO] 坐标原点设置: 纬度={self.origin_latlon[0]:.8f}°, 经度={self.origin_latlon[1]:.8f}°") |
| | | except ValueError as exc: |
| | | raise ValueError(f"原点坐标解析失败: {exc}") from exc |
| | | try: |
| | | self._apply_origin_geo(self.origin_geo) |
| | | except ValueError as exc: |
| | | raise ValueError(f"原点坐标解析失败: {exc}") from exc |
| | | |
| | | # GPS/IMU接收器 |
| | | self.receiver = GPSIMUReceiver(port, baudrate) |
| | | self.receiver: Optional[GPSIMUReceiver] = None |
| | | |
| | | # 控制命令发送器 (共用同一个串口) |
| | | self.cmd_sender = None |
| | |
| | | # 数据缓存 |
| | | self.latest_gps: Optional[GPSData] = None |
| | | self.latest_imu: Optional[IMUData] = None |
| | | self.path_points = [] |
| | | self.trail_points = [] |
| | | self.last_log_time = 0.0 |
| | | self.gui_bridge = gui_bridge |
| | | self._stop_event = threading.Event() |
| | | self._last_gui_publish_log = 0.0 |
| | | |
| | | # 统计 |
| | | self.control_count = 0 |
| | | self.control_freq_hz = 0.0 |
| | | self._freq_sample_time: Optional[float] = None |
| | | self._freq_sample_count = 0 |
| | | self._running = False |
| | | |
| | | def load_path(self, path_file: str) -> bool: |
| | | """ |
| | |
| | | } |
| | | |
| | | self.mower_ctrl = MowerController(path_points, params=params) |
| | | self.path_points = path_points |
| | | self.trail_points = [] |
| | | print("[INFO] 运动控制器初始化完成") |
| | | return True |
| | | |
| | |
| | | |
| | | def connect(self) -> bool: |
| | | """连接串口""" |
| | | if self.is_connected(): |
| | | print("[INFO] 串口已连接") |
| | | return True |
| | | if not self.port: |
| | | print("[ERROR] 未指定串口端口") |
| | | return False |
| | | self.receiver = GPSIMUReceiver(self.port, self.baudrate) |
| | | if not self.receiver.connect(): |
| | | self.receiver = None |
| | | return False |
| | | |
| | | # 创建控制命令发送器 (共用串口对象) |
| | |
| | | |
| | | def disconnect(self): |
| | | """断开连接""" |
| | | if self.receiver: |
| | | self.receiver.disconnect() |
| | | self.receiver = None |
| | | self.cmd_sender = None |
| | | |
| | | def is_connected(self) -> bool: |
| | | return bool(self.receiver and self.receiver.serial and self.receiver.serial.is_open) |
| | | |
| | | def is_running(self) -> bool: |
| | | return self._running |
| | | |
| | | def set_port(self, port: str): |
| | | if self.is_connected(): |
| | | raise RuntimeError("请先关闭当前串口,再切换端口") |
| | | self.port = port.strip() |
| | | |
| | | def stop(self): |
| | | """请求停止控制循环""" |
| | | self._stop_event.set() |
| | | |
| | | def _update_visualizer( |
| | | self, |
| | | pos_xyz, |
| | | heading_deg: float, |
| | | pitch: float, |
| | | roll: float, |
| | | forward: int, |
| | | turn: int, |
| | | status: str, |
| | | stage: str, |
| | | target_xy: Optional[Tuple[float, float]], |
| | | ): |
| | | if not self.gui_bridge: |
| | | return |
| | | |
| | | info_lines = [ |
| | | f"状态: {status}", |
| | | f"阶段: {stage}", |
| | | f"控制频率: {self.control_freq_hz:5.1f} Hz", |
| | | "", |
| | | "位置 (ENU, m):", |
| | | f" E: {pos_xyz[0]:+7.2f}", |
| | | f" N: {pos_xyz[1]:+7.2f}", |
| | | f" U: {pos_xyz[2]:+7.2f}", |
| | | "", |
| | | "姿态 (deg):", |
| | | f" Heading: {heading_deg:+7.2f}", |
| | | f" Pitch : {pitch:+7.2f}", |
| | | f" Roll : {roll:+7.2f}", |
| | | "", |
| | | f"控制输出: F={forward:+4d}, T={turn:+4d}", |
| | | ] |
| | | if target_xy: |
| | | info_lines.append(f"瞄准点: ({target_xy[0]:+.2f}, {target_xy[1]:+.2f})") |
| | | |
| | | info_str = "\n".join(info_lines) |
| | | |
| | | if self.gui_bridge: |
| | | trail_snapshot = list(self.trail_points[-1500:]) |
| | | payload = { |
| | | "info_str": info_str, |
| | | "pos_xyz": pos_xyz, |
| | | "heading_deg": heading_deg, |
| | | "trail": trail_snapshot, |
| | | "target": target_xy, |
| | | "control_freq_hz": self.control_freq_hz, |
| | | } |
| | | self.gui_bridge.publish(payload) |
| | | now = time.time() |
| | | if now - self._last_gui_publish_log > 1.0: |
| | | print(f"[GUI] 控制线程推送数据 pos={pos_xyz}, heading={heading_deg:.2f}, trail={len(trail_snapshot)}, target={target_xy}") |
| | | self._last_gui_publish_log = now |
| | | |
| | | def _record_control_tick(self, current_time: float): |
| | | if self._freq_sample_time is None: |
| | | self._freq_sample_time = current_time |
| | | self._freq_sample_count = 1 |
| | | return |
| | | self._freq_sample_count += 1 |
| | | window = current_time - self._freq_sample_time |
| | | if window >= 1.0: |
| | | self.control_freq_hz = self._freq_sample_count / max(window, 1e-6) |
| | | self._freq_sample_count = 0 |
| | | self._freq_sample_time = current_time |
| | | |
| | | def _apply_origin_geo(self, origin_geo: Optional[str]): |
| | | origin_geo = (origin_geo or "").strip() |
| | | self.origin_latlon = None |
| | | self.origin_ecef = None |
| | | self.origin_lat_rad = None |
| | | self.origin_lon_rad = None |
| | | if not origin_geo: |
| | | return |
| | | lat, lon, alt = _parse_origin_geo(origin_geo) |
| | | self.origin_latlon = (lat, lon) |
| | | if alt is not None: |
| | | self.origin_altitude = alt |
| | | print(f"[INFO] 从原点配置中解析出高度: {self.origin_altitude:.3f} m") |
| | | print(f"[INFO] 坐标原点设置: 纬度={self.origin_latlon[0]:.8f}°, 经度={self.origin_latlon[1]:.8f}°") |
| | | |
| | | def set_origin_geo(self, origin_geo: str): |
| | | """更新原点坐标配置""" |
| | | self.origin_geo = origin_geo.strip() |
| | | self._apply_origin_geo(self.origin_geo) |
| | | |
| | | def _ensure_origin_reference(self, fallback_altitude: float): |
| | | """确保已根据原点经纬度初始化ECEF基准""" |
| | |
| | | if not self.mower_ctrl: |
| | | print("[ERROR] 请先加载路径文件") |
| | | return |
| | | if self._running: |
| | | print("[WARN] 控制循环已在运行") |
| | | return |
| | | if not self.is_connected() or not self.receiver: |
| | | print("[ERROR] 请先连接串口") |
| | | return |
| | | if not self.cmd_sender: |
| | | print("[ERROR] 控制命令发送器未就绪") |
| | | return |
| | | |
| | | self._stop_event.clear() |
| | | self._running = True |
| | | |
| | | print("[INFO] 开始实时控制... (按 Ctrl+C 停止)") |
| | | print("[INFO] 等待GPS数据...") |
| | | |
| | | receiver = self.receiver |
| | | try: |
| | | start_time = time.time() |
| | | last_stats_time = start_time |
| | | self.last_log_time = start_time |
| | | |
| | | # 等待第一个有效的GPS数据 |
| | | gps_ready = False |
| | | while not gps_ready: |
| | | while not gps_ready and not self._stop_event.is_set(): |
| | | gps_data, imu_data = self.receiver.receive_packet() |
| | | |
| | | if gps_data: |
| | |
| | | if imu_data: |
| | | self.latest_imu = imu_data |
| | | |
| | | if self._stop_event.is_set(): |
| | | return |
| | | |
| | | print("[INFO] 开始控制循环") |
| | | |
| | | while True: |
| | | while not self._stop_event.is_set(): |
| | | current_time = time.time() |
| | | |
| | | # 接收数据 (非阻塞式,快速轮询) |
| | | gps_data, imu_data = self.receiver.receive_packet() |
| | | gps_data, imu_data = receiver.receive_packet() if receiver else (None, None) |
| | | |
| | | if gps_data: |
| | | self.latest_gps = gps_data |
| | |
| | | if self.latest_gps: |
| | | # 转换GPS数据到本地坐标 |
| | | x, y, z, heading, vx, vy = self._convert_gps_to_local(self.latest_gps) |
| | | self.trail_points.append((x, y, z)) |
| | | if len(self.trail_points) > 4000: |
| | | self.trail_points.pop(0) |
| | | |
| | | # 更新控制器状态 (只用x,y) |
| | | self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time) |
| | |
| | | |
| | | # 计算控制信号 |
| | | control_output = self.mower_ctrl.compute_control(current_time) |
| | | control_info = control_output.get('info', {}) |
| | | forward_signal = control_output['forward'] # -100~100 |
| | | turn_signal = control_output['turn'] # -100~100 |
| | | |
| | |
| | | self.cmd_sender.send_control_command(steering_pwm, throttle_pwm) |
| | | |
| | | self.control_count += 1 |
| | | self._record_control_tick(current_time) |
| | | self.last_control_time = current_time |
| | | |
| | | # 打印控制信息 (降低频率) |
| | | if self.control_count % 74 == 0: # 每秒打印一次 |
| | | status = control_output['info'].get('status', 'running') |
| | | xte = control_output['info'].get('xte', 0) |
| | | gps_heading = self.latest_gps.heading_angle |
| | | gps_pitch = self.latest_gps.pitch_angle |
| | | gps_roll = self.latest_gps.roll_angle |
| | | |
| | | # 打印控制信息 (2Hz) |
| | | if current_time - self.last_log_time >= 0.5: |
| | | status = control_info.get('status', 'running') |
| | | xte = control_info.get('xte', 0) |
| | | # 数学角度转地理角度:heading_err_deg |
| | | # 注意 control_output['info']['heading_err'] 是弧度 |
| | | heading_err_deg = math.degrees(control_output['info'].get('heading_err', 0)) |
| | | |
| | | # 获取最新的姿态角 (优先使用 GPS 数据,IMU 作为辅助或高频补充,这里直接用 GPS 结构体中的角度) |
| | | # GPSData 中: heading_angle(0-360), pitch_angle, roll_angle |
| | | # 都是角度制 |
| | | gps_heading = self.latest_gps.heading_angle |
| | | gps_pitch = self.latest_gps.pitch_angle |
| | | gps_roll = self.latest_gps.roll_angle |
| | | heading_err_deg = math.degrees(control_info.get('heading_err', 0)) |
| | | |
| | | print(f"[LOG] POS_ENU: {x:.3f}, {y:.3f}, {z:.3f} | " |
| | | f"ATT(H/P/R): {gps_heading:.2f}°, {gps_pitch:.2f}°, {gps_roll:.2f}° | " |
| | | f"CTRL: S={status}, F={forward_signal}, T={turn_signal}, Err={xte:.2f}m") |
| | | self.last_log_time = current_time |
| | | |
| | | stage_label = control_info.get('stage', getattr(self.mower_ctrl, 'stage', '')) |
| | | target_xy = control_info.get('target_xy') |
| | | self._update_visualizer( |
| | | (x, y, z), |
| | | gps_heading, |
| | | gps_pitch, |
| | | gps_roll, |
| | | forward_signal, |
| | | turn_signal, |
| | | control_info.get('status', 'running'), |
| | | stage_label if stage_label else 'unknown', |
| | | target_xy, |
| | | ) |
| | | |
| | | # 检查是否完成 |
| | | if control_output['info'].get('status') == 'finished': |
| | | if control_info.get('status') == 'finished': |
| | | print("[INFO] 路径跟踪完成!") |
| | | self.stop() |
| | | break |
| | | |
| | | # 统计信息 (每10秒) |
| | |
| | | |
| | | except KeyboardInterrupt: |
| | | print("\n[INFO] 用户中断") |
| | | self.stop() |
| | | |
| | | finally: |
| | | self._running = False |
| | | self._stop_event.clear() |
| | | # 发送停止命令 (中位值) |
| | | if self.cmd_sender: |
| | | print("[INFO] 发送停止命令...") |
| | | for _ in range(5): |
| | | self.cmd_sender.send_control_command(1500, 1500) |
| | |
| | | # 打印最终统计 |
| | | print("\n========== 最终统计 ==========") |
| | | print(f"总控制次数: {self.control_count}") |
| | | print(f"GPS数据包: {self.receiver.gps_count}") |
| | | print(f"IMU数据包: {self.receiver.imu_count}") |
| | | if receiver: |
| | | print(f"GPS数据包: {receiver.gps_count}") |
| | | print(f"IMU数据包: {receiver.imu_count}") |
| | | if self.cmd_sender: |
| | | print(f"命令发送: {self.cmd_sender.get_stats()}") |
| | | print(f"错误计数: {self.receiver.error_count}") |
| | | if receiver: |
| | | print(f"错误计数: {receiver.error_count}") |
| | | print("==============================\n") |
| | | |
| | | |
| | | def main(): |
| | | """主函数""" |
| | | # 配置 |
| | | PORT = "COM17" # 根据实际情况修改 |
| | | BAUDRATE = 921600 |
| | | PATH_FILE = "gecaolujing2.txt" # 路径文件 |
| | | OFFSET_XYZ = (0.0, 0.0, 0.0) # 车辆中心相对GPS天线的ENU偏移(米): (东, 北, 天) |
| | | |
| | | # 原点经纬度 (支持 GGA 报文或简略格式) |
| | | # 示例GGA: "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" |
| | | ORIGIN_GEO = "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" |
| | | |
| | | ORIGIN_ALT = None # 原点海拔(米),可选。若GGA报文中包含海拔,会自动使用报文中的海拔。 |
| | | |
| | | # 创建控制器 |
| | | DEFAULT_PORT = "COM17" |
| | | DEFAULT_PATH_FILE = "gecaolujing2.txt" |
| | | DEFAULT_ORIGIN_GGA = "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" |
| | | OFFSET_XYZ = (0.0, 0.0, 0.0) |
| | | |
| | | gui_state = load_gui_state() |
| | | selected_port = gui_state.get("serial_port") or DEFAULT_PORT |
| | | origin_geo = gui_state.get("origin_gga") or DEFAULT_ORIGIN_GGA |
| | | path_file = gui_state.get("path_file") |
| | | |
| | | gui_bridge = GuiBridge() if QtWidgets else None |
| | | |
| | | controller = RealtimeController( |
| | | PORT, |
| | | selected_port, |
| | | BAUDRATE, |
| | | offset_xyz=OFFSET_XYZ, |
| | | origin_geo=ORIGIN_GEO, |
| | | origin_alt_m=ORIGIN_ALT |
| | | origin_geo=origin_geo, |
| | | origin_alt_m=None, |
| | | gui_bridge=gui_bridge, |
| | | ) |
| | | |
| | | # 加载路径 |
| | | if not controller.load_path(PATH_FILE): |
| | | print("[ERROR] 路径加载失败,退出") |
| | | return |
| | | |
| | | # 连接串口 |
| | | # 预加载路径文件(若存在) |
| | | candidate_path = path_file or DEFAULT_PATH_FILE |
| | | if candidate_path and Path(candidate_path).exists(): |
| | | controller.load_path(candidate_path) |
| | | |
| | | def persist_state(updates: Dict[str, str]): |
| | | gui_state.update(updates) |
| | | save_gui_state(gui_state) |
| | | |
| | | if gui_bridge and QtWidgets: |
| | | app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) |
| | | dashboard = QtRealtimeDashboard(controller, gui_bridge.queue, gui_state, persist_state) |
| | | dashboard.show() |
| | | try: |
| | | app.exec_() |
| | | finally: |
| | | controller.stop() |
| | | controller.disconnect() |
| | | else: |
| | | # 无Qt环境时,沿用旧的命令行工作流 |
| | | if not controller.path_points and Path(DEFAULT_PATH_FILE).exists(): |
| | | controller.load_path(DEFAULT_PATH_FILE) |
| | | if not controller.connect(): |
| | | print("[ERROR] 串口连接失败,退出") |
| | | return |
| | | |
| | | # 运行控制循环 |
| | | try: |
| | | controller.run() |
| | | finally: |