#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 实时闭环控制系统 接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令 """ import serial import struct import time import json import math import threading import queue from pathlib import Path from typing import Callable, Dict, Optional, Tuple from serial.tools import list_ports from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData from mower_controller import MowerController, wrap_angle try: from PyQt5 import QtCore, QtGui, QtWidgets except Exception: QtWidgets = None BASE_DIR = Path(__file__).resolve().parent GUI_STATE_FILE = BASE_DIR / "gui_state.json" GUI_STATE_DEFAULT = { "path_file": "", "origin_gga": "", "serial_port": "", } def load_gui_state() -> Dict[str, str]: state = dict(GUI_STATE_DEFAULT) try: if GUI_STATE_FILE.exists(): with GUI_STATE_FILE.open('r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): state.update(data) except Exception as exc: print(f"[WARN] 读取 GUI 配置失败: {exc}") return state def save_gui_state(state: Dict[str, str]) -> None: try: GUI_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) with GUI_STATE_FILE.open('w', encoding='utf-8') as f: json.dump(state, f, ensure_ascii=False, indent=2) except Exception as exc: print(f"[WARN] 保存 GUI 配置失败: {exc}") EARTH_RADIUS_M = 6378137.0 WGS84_A = 6378137.0 # 半长轴 WGS84_F = 1 / 298.257223563 WGS84_E2 = WGS84_F * (2 - WGS84_F) def _dm_to_decimal(dm_value: float) -> float: """ 将度分格式 (DDMM.MMMM) 转为十进制度 """ degrees = int(dm_value // 100) minutes = dm_value - degrees * 100 return degrees + minutes / 60.0 def _parse_origin_geo(origin_geo: str) -> Tuple[float, float, Optional[float]]: """ 解析原点坐标字符串,支持三种格式: 1. 标准格式: "3949.9120,N,11616.8544,E" 2. 带前导逗号: ",3949.9120,N,11616.8544,E" 3. GGA报文: "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" Returns: (lat, lon, alt) 如果原点字符串中不包含高度信息,则 alt 返回 None """ origin_geo = origin_geo.strip() # 判断是否为 GGA 报文 (以 $G 开头,包含 GGA) if origin_geo.startswith('$') and 'GGA' in origin_geo: # 解析 GGA 报文 parts = origin_geo.split(',') if len(parts) < 10: raise ValueError(f"GGA 报文格式错误或不完整: {origin_geo}") # GGA 格式: $xxGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,altUnit... # parts[2]: 纬度 (ddmm.mmmmm) # parts[3]: N/S # parts[4]: 经度 (dddmm.mmmmm) # parts[5]: E/W # parts[9]: 高度 lat_dm_str = parts[2] lat_dir = parts[3] lon_dm_str = parts[4] lon_dir = parts[5] alt_str = parts[9] if not (lat_dm_str and lat_dir and lon_dm_str and lon_dir and alt_str): raise ValueError("GGA 报文缺少必要的坐标或高度信息") lat_dm = float(lat_dm_str) lon_dm = float(lon_dm_str) alt = float(alt_str) lat = _dm_to_decimal(lat_dm) lon = _dm_to_decimal(lon_dm) if lat_dir.upper() == 'S': lat = -lat if lon_dir.upper() == 'W': lon = -lon return lat, lon, alt else: # 旧的简略格式处理 if origin_geo.startswith(','): origin_geo = origin_geo[1:] parts = [p.strip() for p in origin_geo.split(',')] parts = [p for p in parts if p] if len(parts) != 4: raise ValueError("原点坐标格式应为 '纬度度分,N/S,经度度分,E/W' 或 GGA 报文") try: lat_dm = float(parts[0]) lon_dm = float(parts[2]) except ValueError as exc: raise ValueError("度分字段必须是数字") from exc lat = _dm_to_decimal(lat_dm) lon = _dm_to_decimal(lon_dm) lat_dir = parts[1].upper() lon_dir = parts[3].upper() if lat_dir not in ("N", "S"): raise ValueError("纬度方向必须为 N 或 S") if lon_dir not in ("E", "W"): raise ValueError("经度方向必须为 E 或 W") if lat_dir == "S": lat = -lat if lon_dir == "W": lon = -lon return lat, lon, None def _geo_to_ecef(lat_deg: float, lon_deg: float, alt_m: float) -> Tuple[float, float, float]: """将经纬度/高度转换为ECEF坐标""" lat_rad = math.radians(lat_deg) lon_rad = math.radians(lon_deg) sin_lat = math.sin(lat_rad) cos_lat = math.cos(lat_rad) sin_lon = math.sin(lon_rad) cos_lon = math.cos(lon_rad) N = WGS84_A / math.sqrt(1 - WGS84_E2 * sin_lat * sin_lat) x = (N + alt_m) * cos_lat * cos_lon y = (N + alt_m) * cos_lat * sin_lon z = (N * (1 - WGS84_E2) + alt_m) * sin_lat return x, y, z def _ecef_to_enu(dx: float, dy: float, dz: float, lat0_rad: float, lon0_rad: float) -> Tuple[float, float, float]: """将ECEF差值转换为ENU坐标""" sin_lat = math.sin(lat0_rad) cos_lat = math.cos(lat0_rad) sin_lon = math.sin(lon0_rad) cos_lon = math.cos(lon0_rad) east = -sin_lon * dx + cos_lon * dy north = -sin_lat * cos_lon * dx - sin_lat * sin_lon * dy + cos_lat * dz up = cos_lat * cos_lon * dx + cos_lat * sin_lon * dy + sin_lat * dz return east, north, up if QtWidgets: class ZoomableGraphicsView(QtWidgets.QGraphicsView): """支持滚轮缩放与拖拽的视图""" def __init__(self, scene, parent=None): super().__init__(scene, parent) self.setRenderHint(QtGui.QPainter.Antialiasing, True) self.setDragMode(QtWidgets.QGraphicsView.ScrollHandDrag) self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) self.setResizeAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse) self.setViewportUpdateMode(QtWidgets.QGraphicsView.SmartViewportUpdate) self._zoom = 0 def wheelEvent(self, event: QtGui.QWheelEvent): zoom_in_factor = 1.15 zoom_out_factor = 1 / zoom_in_factor if event.angleDelta().y() > 0: factor = zoom_in_factor self._zoom += 1 else: factor = zoom_out_factor self._zoom -= 1 if self._zoom > 50: self._zoom = 50 return if self._zoom < -50: self._zoom = -50 return self.scale(factor, factor) def reset_zoom(self): self._zoom = 0 self.resetTransform() class QtRealtimeDashboard(QtWidgets.QMainWindow): """主线程中的 Qt 状态面板,定时从队列取数据刷新""" def __init__( self, controller, data_queue: queue.Queue, gui_state: Dict[str, str], persist_state: Callable[[Dict[str, str]], None], ): super().__init__() self.controller = controller self.data_queue = data_queue self.gui_state = gui_state or {} self.persist_state_cb = persist_state self.control_thread: Optional[threading.Thread] = None self._last_gui_consume_log = 0.0 self._fit_applied = False self._path_rect = QtCore.QRectF() self.path_points = list(controller.path_points or []) self.setWindowTitle("Lawnmower 运动控制调试界面") self.resize(1200, 680) self._build_ui() self.set_path_points(self.path_points, refit=True) self._refresh_serial_ports(initial=True) self._apply_saved_values() self._update_serial_ui() self._update_control_buttons() self.timer = QtCore.QTimer() self.timer.timeout.connect(self._drain_queue) self.timer.start(80) # ~12.5Hz 刷新 def _build_ui(self): central = QtWidgets.QWidget() layout = QtWidgets.QHBoxLayout(central) self.setCentralWidget(central) self.scene = QtWidgets.QGraphicsScene() self.view = ZoomableGraphicsView(self.scene) layout.addWidget(self.view, stretch=3) self.path_pen = QtGui.QPen(QtGui.QColor("gray")) self.path_pen.setStyle(QtCore.Qt.DashLine) self.path_pen.setWidthF(0.8) self.path_pen.setCosmetic(True) self.path_item = self.scene.addPath(QtGui.QPainterPath(), self.path_pen) trail_pen = QtGui.QPen(QtGui.QColor("blue")) trail_pen.setWidthF(1.0) trail_pen.setCosmetic(True) self.trail_path_item = self.scene.addPath(QtGui.QPainterPath(), trail_pen) self.heading_item = self.scene.addLine(0, 0, 0, 0, QtGui.QPen(QtGui.QColor("red"), 2)) robot_pen = QtGui.QPen(QtGui.QColor("red")) robot_brush = QtGui.QBrush(QtGui.QColor("red")) self.robot_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, robot_pen, robot_brush) target_pen = QtGui.QPen(QtGui.QColor("green")) target_brush = QtGui.QBrush(QtGui.QColor("green")) self.target_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, target_pen, target_brush) right_panel = QtWidgets.QWidget() right_layout = QtWidgets.QVBoxLayout(right_panel) right_layout.setContentsMargins(8, 0, 0, 0) layout.addWidget(right_panel, stretch=2) path_group = QtWidgets.QGroupBox("路径文件") path_layout = QtWidgets.QVBoxLayout(path_group) path_row = QtWidgets.QHBoxLayout() self.path_line = QtWidgets.QLineEdit() self.path_browse_btn = QtWidgets.QPushButton("浏览") self.path_load_btn = QtWidgets.QPushButton("加载") path_row.addWidget(self.path_line, stretch=1) path_row.addWidget(self.path_browse_btn) path_layout.addLayout(path_row) path_layout.addWidget(self.path_load_btn, alignment=QtCore.Qt.AlignRight) right_layout.addWidget(path_group) origin_group = QtWidgets.QGroupBox("原点 (GGA)") origin_layout = QtWidgets.QVBoxLayout(origin_group) self.origin_input = QtWidgets.QLineEdit() self.origin_update_btn = QtWidgets.QPushButton("更新原点") origin_layout.addWidget(self.origin_input) origin_layout.addWidget(self.origin_update_btn, alignment=QtCore.Qt.AlignRight) right_layout.addWidget(origin_group) serial_group = QtWidgets.QGroupBox("串口设置") serial_layout = QtWidgets.QVBoxLayout(serial_group) serial_row = QtWidgets.QHBoxLayout() self.port_combo = QtWidgets.QComboBox() self.port_refresh_btn = QtWidgets.QPushButton("刷新") serial_row.addWidget(self.port_combo, stretch=1) serial_row.addWidget(self.port_refresh_btn) serial_layout.addLayout(serial_row) self.serial_toggle_btn = QtWidgets.QPushButton("打开串口") serial_layout.addWidget(self.serial_toggle_btn) right_layout.addWidget(serial_group) control_group = QtWidgets.QGroupBox("运动控制") control_layout = QtWidgets.QHBoxLayout(control_group) self.start_button = QtWidgets.QPushButton("开始") self.stop_button = QtWidgets.QPushButton("停止") self.stop_button.setEnabled(False) control_layout.addWidget(self.start_button) control_layout.addWidget(self.stop_button) right_layout.addWidget(control_group) follow_row = QtWidgets.QHBoxLayout() self.follow_checkbox = QtWidgets.QCheckBox("跟随车辆") self.follow_checkbox.setChecked(True) self.freq_label = QtWidgets.QLabel("控制频率: --.- Hz") self.freq_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;") follow_row.addWidget(self.follow_checkbox) follow_row.addWidget(self.freq_label) follow_row.addStretch() right_layout.addLayout(follow_row) self.info_label = QtWidgets.QLabel() self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;") info_scroll = QtWidgets.QScrollArea() info_scroll.setWidgetResizable(True) info_scroll.setWidget(self.info_label) info_scroll.setMinimumWidth(320) right_layout.addWidget(info_scroll, stretch=1) if self.controller is not None: self.path_browse_btn.clicked.connect(self._browse_path_file) self.path_load_btn.clicked.connect(self._load_path_from_ui) self.origin_update_btn.clicked.connect(self._update_origin_from_ui) self.port_refresh_btn.clicked.connect(lambda: self._refresh_serial_ports(initial=False)) self.serial_toggle_btn.clicked.connect(self._toggle_serial_connection) self.start_button.clicked.connect(self._start_control) self.stop_button.clicked.connect(self._stop_control) def _apply_saved_values(self): path_file = self.gui_state.get("path_file", "") if path_file: self.path_line.setText(path_file) origin = self.gui_state.get("origin_gga", "") if origin: self.origin_input.setText(origin) saved_port = self.gui_state.get("serial_port", "") if saved_port: index = self.port_combo.findText(saved_port) if index >= 0: self.port_combo.setCurrentIndex(index) def set_path_points(self, path_points, refit=False): self._init_path(path_points or []) if refit: self._fit_applied = False self._ensure_initial_view() def _init_path(self, path_points): self.path_points = list(path_points or []) painter_path = QtGui.QPainterPath() if not self.path_points: self.path_item.setPath(painter_path) self.scene.setSceneRect(-10, -10, 20, 20) self._path_rect = self.scene.sceneRect() return first = True for px, py in self.path_points: py = -py if first: painter_path.moveTo(px, py) first = False else: painter_path.lineTo(px, py) self.path_item.setPath(painter_path) xs = [p[0] for p in self.path_points] ys = [-p[1] for p in self.path_points] margin = 2.0 min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) if min_x == max_x: min_x -= 1 max_x += 1 if min_y == max_y: min_y -= 1 max_y += 1 self.scene.setSceneRect(min_x - margin, min_y - margin, (max_x - min_x) + 2 * margin, (max_y - min_y) + 2 * margin) self._path_rect = self.scene.sceneRect() def _drain_queue(self): updated = False processed = 0 while True: try: data = self.data_queue.get_nowait() except queue.Empty: break self._apply_update(data) updated = True processed += 1 if updated: self.data_queue.task_done() now = time.time() if now - self._last_gui_consume_log > 1.0: print(f"[GUI] Qt面板刷新 {processed} 条数据,队列剩余 {self.data_queue.qsize()}") self._last_gui_consume_log = now def _apply_update(self, data): pos_xyz = data["pos_xyz"] heading_deg = data["heading_deg"] trail_points = data["trail"] target_point = data["target"] info_str = data["info_str"] control_freq = data.get("control_freq_hz") path = QtGui.QPainterPath() first = True for px, py, _pz in trail_points[-1500:]: py = -py if first: path.moveTo(px, py) first = False else: path.lineTo(px, py) self.trail_path_item.setPath(path) x = pos_xyz[0] y = -pos_xyz[1] self.robot_item.setRect(x - 0.3, y - 0.3, 0.6, 0.6) arrow_len = 1.0 heading_rad = math.radians(heading_deg) dx = arrow_len * math.cos(heading_rad) dy = arrow_len * math.sin(heading_rad) self.heading_item.setLine(x, y, x + dx, y - dy) if target_point: tx = target_point[0] ty = -target_point[1] self.target_item.setRect(tx - 0.3, ty - 0.3, 0.6, 0.6) else: self.target_item.setRect(0, 0, 0, 0) self.info_label.setText(info_str) if self.follow_checkbox.isChecked(): self._center_on_robot(x, y) if control_freq is not None: self.freq_label.setText(f"控制频率: {control_freq:5.1f} Hz") def _ensure_initial_view(self): if not self._fit_applied and not self._path_rect.isNull(): self.view.reset_zoom() self.view.fitInView(self._path_rect, QtCore.Qt.KeepAspectRatio) self._fit_applied = True def _center_on_robot(self, scene_x, scene_y): self.view.centerOn(scene_x, scene_y) def _browse_path_file(self): current = self.path_line.text().strip() start_dir = Path(current).parent if current else BASE_DIR file_name, _ = QtWidgets.QFileDialog.getOpenFileName( self, "选择路径文件", str(start_dir), "路径文件 (*.txt *.json);;所有文件 (*)", ) if file_name: self.path_line.setText(file_name) def _load_path_from_ui(self): if self.controller.is_running(): QtWidgets.QMessageBox.warning(self, "路径", "请先停止运动控制,再加载路径。") return path_file = self.path_line.text().strip() if not path_file: QtWidgets.QMessageBox.warning(self, "路径", "请先选择路径文件。") return if not Path(path_file).exists(): QtWidgets.QMessageBox.warning(self, "路径", "路径文件不存在。") return if self.controller.load_path(path_file): self.set_path_points(self.controller.path_points, refit=True) self._save_state({"path_file": path_file}) QtWidgets.QMessageBox.information(self, "路径", "路径加载成功。") self._update_control_buttons() else: QtWidgets.QMessageBox.warning(self, "路径", "路径加载失败,请检查文件内容。") def _update_origin_from_ui(self): if self.controller.is_running(): QtWidgets.QMessageBox.warning(self, "原点", "请先停止运动控制,再更新原点。") return origin = self.origin_input.text().strip() try: self.controller.set_origin_geo(origin) except ValueError as exc: QtWidgets.QMessageBox.warning(self, "原点", f"原点解析失败: {exc}") return self._save_state({"origin_gga": origin}) QtWidgets.QMessageBox.information(self, "原点", "原点已更新。") def _refresh_serial_ports(self, initial: bool = False): ports = [p.device for p in list_ports.comports()] saved_port = self.gui_state.get("serial_port", "") if initial and saved_port and saved_port not in ports: ports.append(saved_port) current = self.port_combo.currentText() self.port_combo.blockSignals(True) self.port_combo.clear() self.port_combo.addItems(ports) target = saved_port or current if target: index = self.port_combo.findText(target) if index >= 0: self.port_combo.setCurrentIndex(index) self.port_combo.blockSignals(False) def _toggle_serial_connection(self): QtWidgets.QMessageBox.warning(self, "串口", "HITL 模式下该操作由仿真器控制") def _start_control(self): QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器启动") def _stop_control(self): QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器停止") def _is_control_running(self) -> bool: return False def _save_state(self, updates: Dict[str, str]): self.gui_state.update(updates) if self.persist_state_cb: self.persist_state_cb(updates) def _update_serial_ui(self): self.serial_toggle_btn.setEnabled(False) self.port_combo.setEnabled(False) self.port_refresh_btn.setEnabled(False) def _update_control_buttons(self): self.start_button.setEnabled(False) self.stop_button.setEnabled(False) @QtCore.pyqtSlot() def _on_control_finished(self): pass @QtCore.pyqtSlot(str) def _show_error_message(self, message: str): QtWidgets.QMessageBox.warning(self, "错误", message) def closeEvent(self, event): super().closeEvent(event) else: QtRealtimeDashboard = None class GuiBridge: """控制线程向 Qt 主线程发送数据的桥接器""" def __init__(self): self.queue: queue.Queue = queue.Queue(maxsize=50) self._last_publish_log = 0.0 def publish(self, data): try: self.queue.put_nowait(data) except queue.Full: try: self.queue.get_nowait() except queue.Empty: pass try: self.queue.put_nowait(data) except queue.Full: pass now = time.time() if now - self._last_publish_log > 1.0: print(f"[GUI] 发布数据到队列,当前长度 {self.queue.qsize()},显示位置 {data.get('pos_xyz')}") self._last_publish_log = now class ControlCommandSender: """控制命令发送器 - 发送转向和油门控制信号给STM32""" # 协议常量 FRAME_HEADER1 = 0xAA FRAME_HEADER2 = 0x55 TYPE_CONTROL = 0x10 # 控制命令类型 FRAME_FOOTER1 = 0x0D FRAME_FOOTER2 = 0x0A def __init__(self, serial_port: serial.Serial): """ 初始化控制命令发送器 Args: serial_port: 已打开的串口对象 """ self.serial = serial_port self.send_count = 0 def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool: """ 发送控制命令到STM32 协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A Args: steering_pwm: 转向PWM值 (1000-2000us) throttle_pwm: 油门PWM值 (1000-2000us) Returns: 发送成功返回True """ try: # 限制PWM范围 steering_pwm = max(1000, min(2000, steering_pwm)) throttle_pwm = max(1000, min(2000, throttle_pwm)) # 构建数据包 # 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B) data_len = 4 # 2字节转向 + 2字节油门 payload = struct.pack(' bool: """ 加载路径文件并初始化控制器 支持 .json 和 .txt (X,Y;X,Y...) 格式 Args: path_file: 路径文件路径 Returns: 成功返回True """ try: with open(path_file, 'r', encoding='utf-8') as f: content = f.read().strip() path_points = [] # 尝试解析 JSON try: if path_file.endswith('.json') or content.startswith('['): path_points = json.loads(content) except json.JSONDecodeError: # 假如不是JSON,尝试解析 TXT 格式 pass # 如果 JSON 解析未成功或未尝试,则尝试 TXT 解析 if not path_points: # TXT 格式: X,Y;X,Y;... points_str = content.split(';') for p_str in points_str: if not p_str.strip(): continue try: parts = p_str.split(',') if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) path_points.append([x, y]) except ValueError: print(f"[WARN] 忽略无效点: {p_str}") if not path_points: print("[ERROR] 未能解析出有效路径点") return False print(f"[INFO] 加载路径: {len(path_points)} 个点") # 使用sim_offline.py中的高精度参数 params = { # 核心控制增益 'k_heading': 2.0, 'k_xte': 0.8, 'k_heading_d': 0.1, 'k_heading_i': 0.015, 'k_xte_i': 0.02, # 速度控制 'base_speed': 0.8, 'max_forward_mps': 1.2, 'min_follow_speed': 0.40, 'max_reverse_mps': 0.25, 'max_yawrate': math.pi/2.5, # 前视距离 'lookahead_min': 0.35, 'lookahead_max': 1.2, 'lookahead_time': 2.0, # 误差容忍度 'max_xte': 1.5, 'goal_tolerance': 0.1, 'large_err_threshold': 0.5, # 滤波参数 'yawrate_alpha': 0.85, # 速度缩放增益 'curv_gain_scale': 2.0, 'heading_err_gain_scale': 1.5, 'xte_gain_scale': 1.0, # 终点减速区域 'slow_zone': 1.5, 'final_approach_dist': 0.8, } self.mower_ctrl = MowerController(path_points, params=params) self.path_points = path_points self.trail_points = [] print("[INFO] 运动控制器初始化完成") return True except Exception as e: print(f"[ERROR] 加载路径失败: {e}") return False def connect(self) -> bool: """连接串口""" if self.is_connected(): print("[INFO] 串口已连接") return True if not self.port: print("[ERROR] 未指定串口端口") return False self.receiver = GPSIMUReceiver(self.port, self.baudrate) if not self.receiver.connect(): self.receiver = None return False # 创建控制命令发送器 (共用串口对象) self.cmd_sender = ControlCommandSender(self.receiver.serial) print("[INFO] 控制命令发送器就绪") return True def disconnect(self): """断开连接""" if self.receiver: self.receiver.disconnect() self.receiver = None self.cmd_sender = None def is_connected(self) -> bool: return bool(self.receiver and self.receiver.serial and self.receiver.serial.is_open) def is_running(self) -> bool: return self._running def set_port(self, port: str): if self.is_connected(): raise RuntimeError("请先关闭当前串口,再切换端口") self.port = port.strip() def stop(self): """请求停止控制循环""" self._stop_event.set() def _update_visualizer( self, pos_xyz, heading_deg: float, pitch: float, roll: float, forward: int, turn: int, status: str, stage: str, target_xy: Optional[Tuple[float, float]], ): if not self.gui_bridge: return info_lines = [ f"状态: {status}", f"阶段: {stage}", f"控制频率: {self.control_freq_hz:5.1f} Hz", "", "位置 (ENU, m):", f" E: {pos_xyz[0]:+7.2f}", f" N: {pos_xyz[1]:+7.2f}", f" U: {pos_xyz[2]:+7.2f}", "", "姿态 (deg):", f" Heading: {heading_deg:+7.2f}", f" Pitch : {pitch:+7.2f}", f" Roll : {roll:+7.2f}", "", f"控制输出: F={forward:+4d}, T={turn:+4d}", ] if target_xy: info_lines.append(f"瞄准点: ({target_xy[0]:+.2f}, {target_xy[1]:+.2f})") info_str = "\n".join(info_lines) if self.gui_bridge: trail_snapshot = list(self.trail_points[-1500:]) payload = { "info_str": info_str, "pos_xyz": pos_xyz, "heading_deg": heading_deg, "trail": trail_snapshot, "target": target_xy, "control_freq_hz": self.control_freq_hz, } self.gui_bridge.publish(payload) now = time.time() if now - self._last_gui_publish_log > 1.0: print(f"[GUI] 控制线程推送数据 pos={pos_xyz}, heading={heading_deg:.2f}, trail={len(trail_snapshot)}, target={target_xy}") self._last_gui_publish_log = now def _record_control_tick(self, current_time: float): if self._freq_sample_time is None: self._freq_sample_time = current_time self._freq_sample_count = 1 return self._freq_sample_count += 1 window = current_time - self._freq_sample_time if window >= 1.0: self.control_freq_hz = self._freq_sample_count / max(window, 1e-6) self._freq_sample_count = 0 self._freq_sample_time = current_time def _apply_origin_geo(self, origin_geo: Optional[str]): origin_geo = (origin_geo or "").strip() self.origin_latlon = None self.origin_ecef = None self.origin_lat_rad = None self.origin_lon_rad = None if not origin_geo: return lat, lon, alt = _parse_origin_geo(origin_geo) self.origin_latlon = (lat, lon) if alt is not None: self.origin_altitude = alt print(f"[INFO] 从原点配置中解析出高度: {self.origin_altitude:.3f} m") print(f"[INFO] 坐标原点设置: 纬度={self.origin_latlon[0]:.8f}°, 经度={self.origin_latlon[1]:.8f}°") def set_origin_geo(self, origin_geo: str): """更新原点坐标配置""" self.origin_geo = origin_geo.strip() self._apply_origin_geo(self.origin_geo) def _ensure_origin_reference(self, fallback_altitude: float): """确保已根据原点经纬度初始化ECEF基准""" if not self.origin_latlon or self.origin_ecef: return lat0, lon0 = self.origin_latlon if self.origin_altitude is None: self.origin_altitude = fallback_altitude print(f"[INFO] 未指定原点高度,使用首个GPS高度 {self.origin_altitude:.3f} m") self.origin_lat_rad = math.radians(lat0) self.origin_lon_rad = math.radians(lon0) self.origin_ecef = _geo_to_ecef(lat0, lon0, self.origin_altitude) print(f"[INFO] 原点ECEF参考初始化完成 (Alt={self.origin_altitude:.3f} m)") def _convert_gps_to_local(self, gps: GPSData) -> tuple: """ 将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换) Returns: (x, y, z, heading_rad, vx, vy) """ # TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标 # 这里简化处理,假设GPS已提供相对坐标或使用UTM转换 # 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针) # 需要转换为数学坐标系 (东为0°逆时针) heading_rad = math.radians(90 - gps.heading_angle) heading_rad = wrap_angle(heading_rad) # 速度分量 (GPS提供东北天坐标系) vx = gps.east_velocity # 东向速度 = X轴速度 vy = gps.north_velocity # 北向速度 = Y轴速度 # 位置转换为ENU坐标 lat = gps.latitude lon = gps.longitude x = 0.0 y = 0.0 z = 0.0 if self.origin_latlon: self._ensure_origin_reference(gps.altitude) if self.origin_latlon and self.origin_ecef and self.origin_lat_rad is not None and self.origin_lon_rad is not None: current_ecef = _geo_to_ecef(lat, lon, gps.altitude) dx = current_ecef[0] - self.origin_ecef[0] dy = current_ecef[1] - self.origin_ecef[1] dz = current_ecef[2] - self.origin_ecef[2] east, north, up = _ecef_to_enu(dx, dy, dz, self.origin_lat_rad, self.origin_lon_rad) x = east y = north z = up else: x = lon * 111320.0 * math.cos(math.radians(lat)) # 近似米 y = lat * 110540.0 # 近似米 z = gps.altitude # 粗略使用GPS高度 # GPS天线 → 车辆中心的平移修正 (ENU坐标系) dx, dy, dz_offset = self.offset_xyz x += dx y += dy z += dz_offset return (x, y, z, heading_rad, vx, vy) def _control_to_pwm(self, forward: int, turn: int) -> tuple: """ 将控制信号转换为PWM值 Args: forward: 前进信号 (-100 到 100) turn: 转向信号 (-100 到 100, 负为右转, 正为左转) Returns: (steering_pwm, throttle_pwm) 单位us """ # 油门: -100~100 映射到 1000~2000us, 中位1500us throttle_pwm = int(1500 + forward * 5.0) # 转向: -100~100 映射到 1000~2000us, 中位1500us # 注意: 正值左转(1000), 负值右转(2000) steering_pwm = int(1500 - turn * 5.0) return (steering_pwm, throttle_pwm) def run(self): """运行实时控制循环""" if not self.mower_ctrl: print("[ERROR] 请先加载路径文件") return if self._running: print("[WARN] 控制循环已在运行") return if not self.is_connected() or not self.receiver: print("[ERROR] 请先连接串口") return if not self.cmd_sender: print("[ERROR] 控制命令发送器未就绪") return self._stop_event.clear() self._running = True print("[INFO] 开始实时控制... (按 Ctrl+C 停止)") print("[INFO] 等待GPS数据...") receiver = self.receiver try: start_time = time.time() last_stats_time = start_time self.last_log_time = start_time # 等待第一个有效的GPS数据 gps_ready = False while not gps_ready and not self._stop_event.is_set(): gps_data, imu_data = self.receiver.receive_packet() if gps_data: if gps_data.position_quality > 0 and gps_data.satellite_count >= 4: self.latest_gps = gps_data gps_ready = True print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") else: print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") if imu_data: self.latest_imu = imu_data if self._stop_event.is_set(): return print("[INFO] 开始控制循环") while not self._stop_event.is_set(): current_time = time.time() # 接收数据 (非阻塞式,快速轮询) gps_data, imu_data = receiver.receive_packet() if receiver else (None, None) if gps_data: self.latest_gps = gps_data self.last_gps_time = current_time if imu_data: self.latest_imu = imu_data self.last_imu_time = current_time # 控制周期检查 (74Hz) if current_time - self.last_control_time >= self.control_period: if self.latest_gps: # 转换GPS数据到本地坐标 x, y, z, heading, vx, vy = self._convert_gps_to_local(self.latest_gps) self.trail_points.append((x, y, z)) if len(self.trail_points) > 4000: self.trail_points.pop(0) # 更新控制器状态 (只用x,y) self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time) if self.latest_imu: accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z) gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z) self.mower_ctrl.update_imu(accel, gyro, current_time) # 计算控制信号 control_output = self.mower_ctrl.compute_control(current_time) control_info = control_output.get('info', {}) forward_signal = control_output['forward'] # -100~100 turn_signal = control_output['turn'] # -100~100 # 转换为PWM并发送 steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal) self.cmd_sender.send_control_command(steering_pwm, throttle_pwm) self.control_count += 1 self._record_control_tick(current_time) self.last_control_time = current_time gps_heading = self.latest_gps.heading_angle gps_pitch = self.latest_gps.pitch_angle gps_roll = self.latest_gps.roll_angle # 打印控制信息 (2Hz) if current_time - self.last_log_time >= 0.5: status = control_info.get('status', 'running') xte = control_info.get('xte', 0) # 数学角度转地理角度:heading_err_deg # 注意 control_output['info']['heading_err'] 是弧度 heading_err_deg = math.degrees(control_info.get('heading_err', 0)) print(f"[LOG] POS_ENU: {x:.3f}, {y:.3f}, {z:.3f} | " f"ATT(H/P/R): {gps_heading:.2f}°, {gps_pitch:.2f}°, {gps_roll:.2f}° | " f"CTRL: S={status}, F={forward_signal}, T={turn_signal}, Err={xte:.2f}m") self.last_log_time = current_time stage_label = control_info.get('stage', getattr(self.mower_ctrl, 'stage', '')) target_xy = control_info.get('target_xy') self._update_visualizer( (x, y, z), gps_heading, gps_pitch, gps_roll, forward_signal, turn_signal, control_info.get('status', 'running'), stage_label if stage_label else 'unknown', target_xy, ) # 检查是否完成 if control_info.get('status') == 'finished': print("[INFO] 路径跟踪完成!") self.stop() break # 统计信息 (每10秒) # if current_time - last_stats_time > 10.0: # print(f"\n========== 运行统计 ==========") # print(f"运行时间: {current_time - start_time:.1f}s") # print(f"控制次数: {self.control_count}") # print(f"GPS数据包: {self.receiver.gps_count}") # print(f"IMU数据包: {self.receiver.imu_count}") # print(f"命令发送: {self.cmd_sender.get_stats()}") # print(f"错误计数: {self.receiver.error_count}") # if self.latest_gps: # print(f"GPS状态: 质量={self.latest_gps.position_quality}, " # f"卫星={self.latest_gps.satellite_count}, " # f"航向={self.latest_gps.heading_angle:.1f}°") # print(f"==============================\n") # last_stats_time = current_time except KeyboardInterrupt: print("\n[INFO] 用户中断") self.stop() finally: self._running = False self._stop_event.clear() # 发送停止命令 (中位值) if self.cmd_sender: print("[INFO] 发送停止命令...") for _ in range(5): self.cmd_sender.send_control_command(1500, 1500) time.sleep(0.02) # 打印最终统计 print("\n========== 最终统计 ==========") print(f"总控制次数: {self.control_count}") if receiver: print(f"GPS数据包: {receiver.gps_count}") print(f"IMU数据包: {receiver.imu_count}") if self.cmd_sender: print(f"命令发送: {self.cmd_sender.get_stats()}") if receiver: print(f"错误计数: {receiver.error_count}") print("==============================\n") def main(): """主函数""" BAUDRATE = 921600 DEFAULT_PORT = "COM17" DEFAULT_PATH_FILE = "gecaolujing2.txt" DEFAULT_ORIGIN_GGA = "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" OFFSET_XYZ = (0.0, 0.0, 0.0) gui_state = load_gui_state() selected_port = gui_state.get("serial_port") or DEFAULT_PORT origin_geo = gui_state.get("origin_gga") or DEFAULT_ORIGIN_GGA path_file = gui_state.get("path_file") gui_bridge = GuiBridge() if QtWidgets else None controller = RealtimeController( selected_port, BAUDRATE, offset_xyz=OFFSET_XYZ, origin_geo=origin_geo, origin_alt_m=None, gui_bridge=gui_bridge, ) # 预加载路径文件(若存在) candidate_path = path_file or DEFAULT_PATH_FILE if candidate_path and Path(candidate_path).exists(): controller.load_path(candidate_path) def persist_state(updates: Dict[str, str]): gui_state.update(updates) save_gui_state(gui_state) if gui_bridge and QtWidgets: app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) dashboard = QtRealtimeDashboard(controller, gui_bridge.queue, gui_state, persist_state) dashboard.show() try: app.exec_() finally: controller.stop() controller.disconnect() else: # 无Qt环境时,沿用旧的命令行工作流 if not controller.path_points and Path(DEFAULT_PATH_FILE).exists(): controller.load_path(DEFAULT_PATH_FILE) if not controller.connect(): print("[ERROR] 串口连接失败,退出") return try: controller.run() finally: controller.disconnect() if __name__ == "__main__": main()