#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
实时闭环控制系统
|
接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令
|
"""
|
|
import serial
|
import struct
|
import time
|
import json
|
import math
|
import threading
|
import queue
|
from pathlib import Path
|
from typing import Callable, Dict, Optional, Tuple
|
|
from serial.tools import list_ports
|
from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData
|
from mower_controller import MowerController, wrap_angle
|
|
try:
|
from PyQt5 import QtCore, QtGui, QtWidgets
|
except Exception:
|
QtWidgets = None
|
|
BASE_DIR = Path(__file__).resolve().parent
|
GUI_STATE_FILE = BASE_DIR / "gui_state.json"
|
GUI_STATE_DEFAULT = {
|
"path_file": "",
|
"origin_gga": "",
|
"serial_port": "",
|
}
|
|
|
def load_gui_state() -> Dict[str, str]:
|
state = dict(GUI_STATE_DEFAULT)
|
try:
|
if GUI_STATE_FILE.exists():
|
with GUI_STATE_FILE.open('r', encoding='utf-8') as f:
|
data = json.load(f)
|
if isinstance(data, dict):
|
state.update(data)
|
except Exception as exc:
|
print(f"[WARN] 读取 GUI 配置失败: {exc}")
|
return state
|
|
|
def save_gui_state(state: Dict[str, str]) -> None:
|
try:
|
GUI_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
with GUI_STATE_FILE.open('w', encoding='utf-8') as f:
|
json.dump(state, f, ensure_ascii=False, indent=2)
|
except Exception as exc:
|
print(f"[WARN] 保存 GUI 配置失败: {exc}")
|
|
|
EARTH_RADIUS_M = 6378137.0
|
WGS84_A = 6378137.0 # 半长轴
|
WGS84_F = 1 / 298.257223563
|
WGS84_E2 = WGS84_F * (2 - WGS84_F)
|
|
|
def _dm_to_decimal(dm_value: float) -> float:
|
"""
|
将度分格式 (DDMM.MMMM) 转为十进制度
|
"""
|
degrees = int(dm_value // 100)
|
minutes = dm_value - degrees * 100
|
return degrees + minutes / 60.0
|
|
|
def _parse_origin_geo(origin_geo: str) -> Tuple[float, float, Optional[float]]:
|
"""
|
解析原点坐标字符串,支持三种格式:
|
1. 标准格式: "3949.9120,N,11616.8544,E"
|
2. 带前导逗号: ",3949.9120,N,11616.8544,E"
|
3. GGA报文: "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F"
|
|
Returns:
|
(lat, lon, alt)
|
如果原点字符串中不包含高度信息,则 alt 返回 None
|
"""
|
origin_geo = origin_geo.strip()
|
|
# 判断是否为 GGA 报文 (以 $G 开头,包含 GGA)
|
if origin_geo.startswith('$') and 'GGA' in origin_geo:
|
# 解析 GGA 报文
|
parts = origin_geo.split(',')
|
if len(parts) < 10:
|
raise ValueError(f"GGA 报文格式错误或不完整: {origin_geo}")
|
|
# GGA 格式: $xxGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,altUnit...
|
# parts[2]: 纬度 (ddmm.mmmmm)
|
# parts[3]: N/S
|
# parts[4]: 经度 (dddmm.mmmmm)
|
# parts[5]: E/W
|
# parts[9]: 高度
|
|
lat_dm_str = parts[2]
|
lat_dir = parts[3]
|
lon_dm_str = parts[4]
|
lon_dir = parts[5]
|
alt_str = parts[9]
|
|
if not (lat_dm_str and lat_dir and lon_dm_str and lon_dir and alt_str):
|
raise ValueError("GGA 报文缺少必要的坐标或高度信息")
|
|
lat_dm = float(lat_dm_str)
|
lon_dm = float(lon_dm_str)
|
alt = float(alt_str)
|
|
lat = _dm_to_decimal(lat_dm)
|
lon = _dm_to_decimal(lon_dm)
|
|
if lat_dir.upper() == 'S':
|
lat = -lat
|
if lon_dir.upper() == 'W':
|
lon = -lon
|
|
return lat, lon, alt
|
|
else:
|
# 旧的简略格式处理
|
if origin_geo.startswith(','):
|
origin_geo = origin_geo[1:]
|
|
parts = [p.strip() for p in origin_geo.split(',')]
|
parts = [p for p in parts if p]
|
|
if len(parts) != 4:
|
raise ValueError("原点坐标格式应为 '纬度度分,N/S,经度度分,E/W' 或 GGA 报文")
|
|
try:
|
lat_dm = float(parts[0])
|
lon_dm = float(parts[2])
|
except ValueError as exc:
|
raise ValueError("度分字段必须是数字") from exc
|
|
lat = _dm_to_decimal(lat_dm)
|
lon = _dm_to_decimal(lon_dm)
|
|
lat_dir = parts[1].upper()
|
lon_dir = parts[3].upper()
|
|
if lat_dir not in ("N", "S"):
|
raise ValueError("纬度方向必须为 N 或 S")
|
if lon_dir not in ("E", "W"):
|
raise ValueError("经度方向必须为 E 或 W")
|
|
if lat_dir == "S":
|
lat = -lat
|
if lon_dir == "W":
|
lon = -lon
|
|
return lat, lon, None
|
|
|
def _geo_to_ecef(lat_deg: float, lon_deg: float, alt_m: float) -> Tuple[float, float, float]:
|
"""将经纬度/高度转换为ECEF坐标"""
|
lat_rad = math.radians(lat_deg)
|
lon_rad = math.radians(lon_deg)
|
sin_lat = math.sin(lat_rad)
|
cos_lat = math.cos(lat_rad)
|
sin_lon = math.sin(lon_rad)
|
cos_lon = math.cos(lon_rad)
|
|
N = WGS84_A / math.sqrt(1 - WGS84_E2 * sin_lat * sin_lat)
|
x = (N + alt_m) * cos_lat * cos_lon
|
y = (N + alt_m) * cos_lat * sin_lon
|
z = (N * (1 - WGS84_E2) + alt_m) * sin_lat
|
return x, y, z
|
|
|
def _ecef_to_enu(dx: float, dy: float, dz: float, lat0_rad: float, lon0_rad: float) -> Tuple[float, float, float]:
|
"""将ECEF差值转换为ENU坐标"""
|
sin_lat = math.sin(lat0_rad)
|
cos_lat = math.cos(lat0_rad)
|
sin_lon = math.sin(lon0_rad)
|
cos_lon = math.cos(lon0_rad)
|
|
east = -sin_lon * dx + cos_lon * dy
|
north = -sin_lat * cos_lon * dx - sin_lat * sin_lon * dy + cos_lat * dz
|
up = cos_lat * cos_lon * dx + cos_lat * sin_lon * dy + sin_lat * dz
|
return east, north, up
|
|
|
if QtWidgets:
|
class ZoomableGraphicsView(QtWidgets.QGraphicsView):
|
"""支持滚轮缩放与拖拽的视图"""
|
|
def __init__(self, scene, parent=None):
|
super().__init__(scene, parent)
|
self.setRenderHint(QtGui.QPainter.Antialiasing, True)
|
self.setDragMode(QtWidgets.QGraphicsView.ScrollHandDrag)
|
self.setTransformationAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse)
|
self.setResizeAnchor(QtWidgets.QGraphicsView.AnchorUnderMouse)
|
self.setViewportUpdateMode(QtWidgets.QGraphicsView.SmartViewportUpdate)
|
self._zoom = 0
|
|
def wheelEvent(self, event: QtGui.QWheelEvent):
|
zoom_in_factor = 1.15
|
zoom_out_factor = 1 / zoom_in_factor
|
|
if event.angleDelta().y() > 0:
|
factor = zoom_in_factor
|
self._zoom += 1
|
else:
|
factor = zoom_out_factor
|
self._zoom -= 1
|
|
if self._zoom > 50:
|
self._zoom = 50
|
return
|
if self._zoom < -50:
|
self._zoom = -50
|
return
|
|
self.scale(factor, factor)
|
|
def reset_zoom(self):
|
self._zoom = 0
|
self.resetTransform()
|
|
|
class QtRealtimeDashboard(QtWidgets.QMainWindow):
|
"""主线程中的 Qt 状态面板,定时从队列取数据刷新"""
|
|
def __init__(
|
self,
|
controller,
|
data_queue: queue.Queue,
|
gui_state: Dict[str, str],
|
persist_state: Callable[[Dict[str, str]], None],
|
):
|
super().__init__()
|
self.controller = controller
|
self.data_queue = data_queue
|
self.gui_state = gui_state or {}
|
self.persist_state_cb = persist_state
|
self.control_thread: Optional[threading.Thread] = None
|
self._last_gui_consume_log = 0.0
|
self._fit_applied = False
|
self._path_rect = QtCore.QRectF()
|
self.path_points = list(controller.path_points or [])
|
|
self.setWindowTitle("Lawnmower 运动控制调试界面")
|
self.resize(1200, 680)
|
|
self._build_ui()
|
self.set_path_points(self.path_points, refit=True)
|
self._refresh_serial_ports(initial=True)
|
self._apply_saved_values()
|
self._update_serial_ui()
|
self._update_control_buttons()
|
|
self.timer = QtCore.QTimer()
|
self.timer.timeout.connect(self._drain_queue)
|
self.timer.start(80) # ~12.5Hz 刷新
|
|
def _build_ui(self):
|
central = QtWidgets.QWidget()
|
layout = QtWidgets.QHBoxLayout(central)
|
self.setCentralWidget(central)
|
|
self.scene = QtWidgets.QGraphicsScene()
|
self.view = ZoomableGraphicsView(self.scene)
|
layout.addWidget(self.view, stretch=3)
|
|
self.path_pen = QtGui.QPen(QtGui.QColor("gray"))
|
self.path_pen.setStyle(QtCore.Qt.DashLine)
|
self.path_pen.setWidthF(0.8)
|
self.path_pen.setCosmetic(True)
|
self.path_item = self.scene.addPath(QtGui.QPainterPath(), self.path_pen)
|
|
trail_pen = QtGui.QPen(QtGui.QColor("blue"))
|
trail_pen.setWidthF(1.0)
|
trail_pen.setCosmetic(True)
|
self.trail_path_item = self.scene.addPath(QtGui.QPainterPath(), trail_pen)
|
|
self.heading_item = self.scene.addLine(0, 0, 0, 0, QtGui.QPen(QtGui.QColor("red"), 2))
|
robot_pen = QtGui.QPen(QtGui.QColor("red"))
|
robot_brush = QtGui.QBrush(QtGui.QColor("red"))
|
self.robot_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, robot_pen, robot_brush)
|
target_pen = QtGui.QPen(QtGui.QColor("green"))
|
target_brush = QtGui.QBrush(QtGui.QColor("green"))
|
self.target_item = self.scene.addEllipse(-0.3, -0.3, 0.6, 0.6, target_pen, target_brush)
|
|
right_panel = QtWidgets.QWidget()
|
right_layout = QtWidgets.QVBoxLayout(right_panel)
|
right_layout.setContentsMargins(8, 0, 0, 0)
|
layout.addWidget(right_panel, stretch=2)
|
|
path_group = QtWidgets.QGroupBox("路径文件")
|
path_layout = QtWidgets.QVBoxLayout(path_group)
|
path_row = QtWidgets.QHBoxLayout()
|
self.path_line = QtWidgets.QLineEdit()
|
self.path_browse_btn = QtWidgets.QPushButton("浏览")
|
self.path_load_btn = QtWidgets.QPushButton("加载")
|
path_row.addWidget(self.path_line, stretch=1)
|
path_row.addWidget(self.path_browse_btn)
|
path_layout.addLayout(path_row)
|
path_layout.addWidget(self.path_load_btn, alignment=QtCore.Qt.AlignRight)
|
right_layout.addWidget(path_group)
|
|
origin_group = QtWidgets.QGroupBox("原点 (GGA)")
|
origin_layout = QtWidgets.QVBoxLayout(origin_group)
|
self.origin_input = QtWidgets.QLineEdit()
|
self.origin_update_btn = QtWidgets.QPushButton("更新原点")
|
origin_layout.addWidget(self.origin_input)
|
origin_layout.addWidget(self.origin_update_btn, alignment=QtCore.Qt.AlignRight)
|
right_layout.addWidget(origin_group)
|
|
serial_group = QtWidgets.QGroupBox("串口设置")
|
serial_layout = QtWidgets.QVBoxLayout(serial_group)
|
serial_row = QtWidgets.QHBoxLayout()
|
self.port_combo = QtWidgets.QComboBox()
|
self.port_refresh_btn = QtWidgets.QPushButton("刷新")
|
serial_row.addWidget(self.port_combo, stretch=1)
|
serial_row.addWidget(self.port_refresh_btn)
|
serial_layout.addLayout(serial_row)
|
self.serial_toggle_btn = QtWidgets.QPushButton("打开串口")
|
serial_layout.addWidget(self.serial_toggle_btn)
|
right_layout.addWidget(serial_group)
|
|
control_group = QtWidgets.QGroupBox("运动控制")
|
control_layout = QtWidgets.QHBoxLayout(control_group)
|
self.start_button = QtWidgets.QPushButton("开始")
|
self.stop_button = QtWidgets.QPushButton("停止")
|
self.stop_button.setEnabled(False)
|
control_layout.addWidget(self.start_button)
|
control_layout.addWidget(self.stop_button)
|
right_layout.addWidget(control_group)
|
|
follow_row = QtWidgets.QHBoxLayout()
|
self.follow_checkbox = QtWidgets.QCheckBox("跟随车辆")
|
self.follow_checkbox.setChecked(True)
|
self.freq_label = QtWidgets.QLabel("控制频率: --.- Hz")
|
self.freq_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;")
|
follow_row.addWidget(self.follow_checkbox)
|
follow_row.addWidget(self.freq_label)
|
follow_row.addStretch()
|
right_layout.addLayout(follow_row)
|
|
self.info_label = QtWidgets.QLabel()
|
self.info_label.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft)
|
self.info_label.setStyleSheet("font-family: Consolas, 'Courier New'; font-size: 12px;")
|
info_scroll = QtWidgets.QScrollArea()
|
info_scroll.setWidgetResizable(True)
|
info_scroll.setWidget(self.info_label)
|
info_scroll.setMinimumWidth(320)
|
right_layout.addWidget(info_scroll, stretch=1)
|
|
if self.controller is not None:
|
self.path_browse_btn.clicked.connect(self._browse_path_file)
|
self.path_load_btn.clicked.connect(self._load_path_from_ui)
|
self.origin_update_btn.clicked.connect(self._update_origin_from_ui)
|
self.port_refresh_btn.clicked.connect(lambda: self._refresh_serial_ports(initial=False))
|
self.serial_toggle_btn.clicked.connect(self._toggle_serial_connection)
|
self.start_button.clicked.connect(self._start_control)
|
self.stop_button.clicked.connect(self._stop_control)
|
|
def _apply_saved_values(self):
|
path_file = self.gui_state.get("path_file", "")
|
if path_file:
|
self.path_line.setText(path_file)
|
origin = self.gui_state.get("origin_gga", "")
|
if origin:
|
self.origin_input.setText(origin)
|
saved_port = self.gui_state.get("serial_port", "")
|
if saved_port:
|
index = self.port_combo.findText(saved_port)
|
if index >= 0:
|
self.port_combo.setCurrentIndex(index)
|
|
def set_path_points(self, path_points, refit=False):
|
self._init_path(path_points or [])
|
if refit:
|
self._fit_applied = False
|
self._ensure_initial_view()
|
|
def _init_path(self, path_points):
|
self.path_points = list(path_points or [])
|
painter_path = QtGui.QPainterPath()
|
if not self.path_points:
|
self.path_item.setPath(painter_path)
|
self.scene.setSceneRect(-10, -10, 20, 20)
|
self._path_rect = self.scene.sceneRect()
|
return
|
|
first = True
|
for px, py in self.path_points:
|
py = -py
|
if first:
|
painter_path.moveTo(px, py)
|
first = False
|
else:
|
painter_path.lineTo(px, py)
|
self.path_item.setPath(painter_path)
|
|
xs = [p[0] for p in self.path_points]
|
ys = [-p[1] for p in self.path_points]
|
margin = 2.0
|
min_x, max_x = min(xs), max(xs)
|
min_y, max_y = min(ys), max(ys)
|
if min_x == max_x:
|
min_x -= 1
|
max_x += 1
|
if min_y == max_y:
|
min_y -= 1
|
max_y += 1
|
self.scene.setSceneRect(min_x - margin, min_y - margin, (max_x - min_x) + 2 * margin, (max_y - min_y) + 2 * margin)
|
self._path_rect = self.scene.sceneRect()
|
|
def _drain_queue(self):
|
updated = False
|
processed = 0
|
while True:
|
try:
|
data = self.data_queue.get_nowait()
|
except queue.Empty:
|
break
|
self._apply_update(data)
|
updated = True
|
processed += 1
|
if updated:
|
self.data_queue.task_done()
|
now = time.time()
|
if now - self._last_gui_consume_log > 1.0:
|
print(f"[GUI] Qt面板刷新 {processed} 条数据,队列剩余 {self.data_queue.qsize()}")
|
self._last_gui_consume_log = now
|
|
def _apply_update(self, data):
|
pos_xyz = data["pos_xyz"]
|
heading_deg = data["heading_deg"]
|
trail_points = data["trail"]
|
target_point = data["target"]
|
info_str = data["info_str"]
|
control_freq = data.get("control_freq_hz")
|
|
path = QtGui.QPainterPath()
|
first = True
|
for px, py, _pz in trail_points[-1500:]:
|
py = -py
|
if first:
|
path.moveTo(px, py)
|
first = False
|
else:
|
path.lineTo(px, py)
|
self.trail_path_item.setPath(path)
|
|
x = pos_xyz[0]
|
y = -pos_xyz[1]
|
self.robot_item.setRect(x - 0.3, y - 0.3, 0.6, 0.6)
|
|
arrow_len = 1.0
|
heading_rad = math.radians(heading_deg)
|
dx = arrow_len * math.cos(heading_rad)
|
dy = arrow_len * math.sin(heading_rad)
|
self.heading_item.setLine(x, y, x + dx, y - dy)
|
|
if target_point:
|
tx = target_point[0]
|
ty = -target_point[1]
|
self.target_item.setRect(tx - 0.3, ty - 0.3, 0.6, 0.6)
|
else:
|
self.target_item.setRect(0, 0, 0, 0)
|
|
self.info_label.setText(info_str)
|
if self.follow_checkbox.isChecked():
|
self._center_on_robot(x, y)
|
if control_freq is not None:
|
self.freq_label.setText(f"控制频率: {control_freq:5.1f} Hz")
|
|
def _ensure_initial_view(self):
|
if not self._fit_applied and not self._path_rect.isNull():
|
self.view.reset_zoom()
|
self.view.fitInView(self._path_rect, QtCore.Qt.KeepAspectRatio)
|
self._fit_applied = True
|
|
def _center_on_robot(self, scene_x, scene_y):
|
self.view.centerOn(scene_x, scene_y)
|
|
def _browse_path_file(self):
|
current = self.path_line.text().strip()
|
start_dir = Path(current).parent if current else BASE_DIR
|
file_name, _ = QtWidgets.QFileDialog.getOpenFileName(
|
self,
|
"选择路径文件",
|
str(start_dir),
|
"路径文件 (*.txt *.json);;所有文件 (*)",
|
)
|
if file_name:
|
self.path_line.setText(file_name)
|
|
def _load_path_from_ui(self):
|
if self.controller.is_running():
|
QtWidgets.QMessageBox.warning(self, "路径", "请先停止运动控制,再加载路径。")
|
return
|
path_file = self.path_line.text().strip()
|
if not path_file:
|
QtWidgets.QMessageBox.warning(self, "路径", "请先选择路径文件。")
|
return
|
if not Path(path_file).exists():
|
QtWidgets.QMessageBox.warning(self, "路径", "路径文件不存在。")
|
return
|
if self.controller.load_path(path_file):
|
self.set_path_points(self.controller.path_points, refit=True)
|
self._save_state({"path_file": path_file})
|
QtWidgets.QMessageBox.information(self, "路径", "路径加载成功。")
|
self._update_control_buttons()
|
else:
|
QtWidgets.QMessageBox.warning(self, "路径", "路径加载失败,请检查文件内容。")
|
|
def _update_origin_from_ui(self):
|
if self.controller.is_running():
|
QtWidgets.QMessageBox.warning(self, "原点", "请先停止运动控制,再更新原点。")
|
return
|
origin = self.origin_input.text().strip()
|
try:
|
self.controller.set_origin_geo(origin)
|
except ValueError as exc:
|
QtWidgets.QMessageBox.warning(self, "原点", f"原点解析失败: {exc}")
|
return
|
self._save_state({"origin_gga": origin})
|
QtWidgets.QMessageBox.information(self, "原点", "原点已更新。")
|
|
def _refresh_serial_ports(self, initial: bool = False):
|
ports = [p.device for p in list_ports.comports()]
|
saved_port = self.gui_state.get("serial_port", "")
|
if initial and saved_port and saved_port not in ports:
|
ports.append(saved_port)
|
current = self.port_combo.currentText()
|
self.port_combo.blockSignals(True)
|
self.port_combo.clear()
|
self.port_combo.addItems(ports)
|
target = saved_port or current
|
if target:
|
index = self.port_combo.findText(target)
|
if index >= 0:
|
self.port_combo.setCurrentIndex(index)
|
self.port_combo.blockSignals(False)
|
|
def _toggle_serial_connection(self):
|
QtWidgets.QMessageBox.warning(self, "串口", "HITL 模式下该操作由仿真器控制")
|
|
def _start_control(self):
|
QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器启动")
|
|
def _stop_control(self):
|
QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器停止")
|
|
def _is_control_running(self) -> bool:
|
return False
|
|
def _save_state(self, updates: Dict[str, str]):
|
self.gui_state.update(updates)
|
if self.persist_state_cb:
|
self.persist_state_cb(updates)
|
|
def _update_serial_ui(self):
|
self.serial_toggle_btn.setEnabled(False)
|
self.port_combo.setEnabled(False)
|
self.port_refresh_btn.setEnabled(False)
|
|
def _update_control_buttons(self):
|
self.start_button.setEnabled(False)
|
self.stop_button.setEnabled(False)
|
|
@QtCore.pyqtSlot()
|
def _on_control_finished(self):
|
pass
|
|
@QtCore.pyqtSlot(str)
|
def _show_error_message(self, message: str):
|
QtWidgets.QMessageBox.warning(self, "错误", message)
|
|
def closeEvent(self, event):
|
super().closeEvent(event)
|
else:
|
QtRealtimeDashboard = None
|
|
|
class GuiBridge:
|
"""控制线程向 Qt 主线程发送数据的桥接器"""
|
|
def __init__(self):
|
self.queue: queue.Queue = queue.Queue(maxsize=50)
|
self._last_publish_log = 0.0
|
|
def publish(self, data):
|
try:
|
self.queue.put_nowait(data)
|
except queue.Full:
|
try:
|
self.queue.get_nowait()
|
except queue.Empty:
|
pass
|
try:
|
self.queue.put_nowait(data)
|
except queue.Full:
|
pass
|
now = time.time()
|
if now - self._last_publish_log > 1.0:
|
print(f"[GUI] 发布数据到队列,当前长度 {self.queue.qsize()},显示位置 {data.get('pos_xyz')}")
|
self._last_publish_log = now
|
|
|
class ControlCommandSender:
|
"""控制命令发送器 - 发送转向和油门控制信号给STM32"""
|
|
# 协议常量
|
FRAME_HEADER1 = 0xAA
|
FRAME_HEADER2 = 0x55
|
TYPE_CONTROL = 0x10 # 控制命令类型
|
FRAME_FOOTER1 = 0x0D
|
FRAME_FOOTER2 = 0x0A
|
|
def __init__(self, serial_port: serial.Serial):
|
"""
|
初始化控制命令发送器
|
|
Args:
|
serial_port: 已打开的串口对象
|
"""
|
self.serial = serial_port
|
self.send_count = 0
|
|
def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool:
|
"""
|
发送控制命令到STM32
|
|
协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A
|
|
Args:
|
steering_pwm: 转向PWM值 (1000-2000us)
|
throttle_pwm: 油门PWM值 (1000-2000us)
|
|
Returns:
|
发送成功返回True
|
"""
|
try:
|
# 限制PWM范围
|
steering_pwm = max(1000, min(2000, steering_pwm))
|
throttle_pwm = max(1000, min(2000, throttle_pwm))
|
|
# 构建数据包
|
# 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B)
|
data_len = 4 # 2字节转向 + 2字节油门
|
|
payload = struct.pack('<HH', steering_pwm, throttle_pwm)
|
|
# 构建完整帧用于计算校验和
|
checksum_data = (
|
bytes([self.TYPE_CONTROL]) +
|
struct.pack('<H', data_len) +
|
payload
|
)
|
|
# 计算16位累加和校验
|
checksum = sum(checksum_data) & 0xFFFF
|
|
# 完整数据包
|
packet = (
|
bytes([self.FRAME_HEADER1, self.FRAME_HEADER2]) +
|
checksum_data +
|
struct.pack('<H', checksum) +
|
bytes([self.FRAME_FOOTER1, self.FRAME_FOOTER2])
|
)
|
|
# 发送
|
self.serial.write(packet)
|
self.serial.flush()
|
|
self.send_count += 1
|
return True
|
|
except Exception as e:
|
print(f"[ERROR] 控制命令发送失败: {e}")
|
return False
|
|
def get_stats(self):
|
"""获取发送统计"""
|
return self.send_count
|
|
|
class RealtimeController:
|
"""实时控制器 - 整合GPS/IMU接收、控制算法和命令发送"""
|
|
def __init__(self, port: str, baudrate: int = 921600,
|
offset_xyz: Tuple[float, float, float] = (0.0, 0.0, 0.0),
|
origin_geo: Optional[str] = None,
|
origin_alt_m: Optional[float] = None,
|
gui_bridge: Optional[GuiBridge] = None):
|
"""
|
初始化实时控制器
|
|
Args:
|
port: 串口设备名
|
baudrate: 波特率
|
offset_xyz: 车辆质心相对GPS天线在东-北-天(ENU)坐标系下的偏移量(米)
|
origin_geo: 坐标原点的经纬度 (度分格式字符串),用于计算ENU坐标
|
origin_alt_m: 原点的海拔高度(米),若未提供则默认使用首次GPS高度
|
"""
|
self.port = port
|
self.baudrate = baudrate
|
self.offset_xyz = offset_xyz
|
self.origin_geo = origin_geo or ""
|
self.origin_latlon: Optional[Tuple[float, float]] = None
|
self.origin_altitude: Optional[float] = origin_alt_m
|
self.origin_lat_rad: Optional[float] = None
|
self.origin_lon_rad: Optional[float] = None
|
self.origin_ecef: Optional[Tuple[float, float, float]] = None
|
try:
|
self._apply_origin_geo(self.origin_geo)
|
except ValueError as exc:
|
raise ValueError(f"原点坐标解析失败: {exc}") from exc
|
|
# GPS/IMU接收器
|
self.receiver: Optional[GPSIMUReceiver] = None
|
|
# 控制命令发送器 (共用同一个串口)
|
self.cmd_sender = None
|
|
# 运动控制器
|
self.mower_ctrl = None
|
|
# 状态变量
|
self.last_gps_time = 0
|
self.last_imu_time = 0
|
self.control_period = 1.0 / 74.0 # 74Hz控制频率
|
self.last_control_time = 0
|
|
# 数据缓存
|
self.latest_gps: Optional[GPSData] = None
|
self.latest_imu: Optional[IMUData] = None
|
self.path_points = []
|
self.trail_points = []
|
self.last_log_time = 0.0
|
self.gui_bridge = gui_bridge
|
self._stop_event = threading.Event()
|
self._last_gui_publish_log = 0.0
|
|
# 统计
|
self.control_count = 0
|
self.control_freq_hz = 0.0
|
self._freq_sample_time: Optional[float] = None
|
self._freq_sample_count = 0
|
self._running = False
|
|
def load_path(self, path_file: str) -> bool:
|
"""
|
加载路径文件并初始化控制器
|
支持 .json 和 .txt (X,Y;X,Y...) 格式
|
|
Args:
|
path_file: 路径文件路径
|
|
Returns:
|
成功返回True
|
"""
|
try:
|
with open(path_file, 'r', encoding='utf-8') as f:
|
content = f.read().strip()
|
|
path_points = []
|
|
# 尝试解析 JSON
|
try:
|
if path_file.endswith('.json') or content.startswith('['):
|
path_points = json.loads(content)
|
except json.JSONDecodeError:
|
# 假如不是JSON,尝试解析 TXT 格式
|
pass
|
|
# 如果 JSON 解析未成功或未尝试,则尝试 TXT 解析
|
if not path_points:
|
# TXT 格式: X,Y;X,Y;...
|
points_str = content.split(';')
|
for p_str in points_str:
|
if not p_str.strip():
|
continue
|
try:
|
parts = p_str.split(',')
|
if len(parts) >= 2:
|
x = float(parts[0])
|
y = float(parts[1])
|
path_points.append([x, y])
|
except ValueError:
|
print(f"[WARN] 忽略无效点: {p_str}")
|
|
if not path_points:
|
print("[ERROR] 未能解析出有效路径点")
|
return False
|
|
print(f"[INFO] 加载路径: {len(path_points)} 个点")
|
|
# 使用sim_offline.py中的高精度参数
|
params = {
|
# 核心控制增益
|
'k_heading': 2.0,
|
'k_xte': 0.8,
|
'k_heading_d': 0.1,
|
'k_heading_i': 0.015,
|
'k_xte_i': 0.02,
|
|
# 速度控制
|
'base_speed': 0.8,
|
'max_forward_mps': 1.2,
|
'min_follow_speed': 0.40,
|
'max_reverse_mps': 0.25,
|
'max_yawrate': math.pi/2.5,
|
|
# 前视距离
|
'lookahead_min': 0.35,
|
'lookahead_max': 1.2,
|
'lookahead_time': 2.0,
|
|
# 误差容忍度
|
'max_xte': 1.5,
|
'goal_tolerance': 0.1,
|
'large_err_threshold': 0.5,
|
|
# 滤波参数
|
'yawrate_alpha': 0.85,
|
|
# 速度缩放增益
|
'curv_gain_scale': 2.0,
|
'heading_err_gain_scale': 1.5,
|
'xte_gain_scale': 1.0,
|
|
# 终点减速区域
|
'slow_zone': 1.5,
|
'final_approach_dist': 0.8,
|
}
|
|
self.mower_ctrl = MowerController(path_points, params=params)
|
self.path_points = path_points
|
self.trail_points = []
|
print("[INFO] 运动控制器初始化完成")
|
return True
|
|
except Exception as e:
|
print(f"[ERROR] 加载路径失败: {e}")
|
return False
|
|
def connect(self) -> bool:
|
"""连接串口"""
|
if self.is_connected():
|
print("[INFO] 串口已连接")
|
return True
|
if not self.port:
|
print("[ERROR] 未指定串口端口")
|
return False
|
self.receiver = GPSIMUReceiver(self.port, self.baudrate)
|
if not self.receiver.connect():
|
self.receiver = None
|
return False
|
|
# 创建控制命令发送器 (共用串口对象)
|
self.cmd_sender = ControlCommandSender(self.receiver.serial)
|
print("[INFO] 控制命令发送器就绪")
|
return True
|
|
def disconnect(self):
|
"""断开连接"""
|
if self.receiver:
|
self.receiver.disconnect()
|
self.receiver = None
|
self.cmd_sender = None
|
|
def is_connected(self) -> bool:
|
return bool(self.receiver and self.receiver.serial and self.receiver.serial.is_open)
|
|
def is_running(self) -> bool:
|
return self._running
|
|
def set_port(self, port: str):
|
if self.is_connected():
|
raise RuntimeError("请先关闭当前串口,再切换端口")
|
self.port = port.strip()
|
|
def stop(self):
|
"""请求停止控制循环"""
|
self._stop_event.set()
|
|
def _update_visualizer(
|
self,
|
pos_xyz,
|
heading_deg: float,
|
pitch: float,
|
roll: float,
|
forward: int,
|
turn: int,
|
status: str,
|
stage: str,
|
target_xy: Optional[Tuple[float, float]],
|
):
|
if not self.gui_bridge:
|
return
|
|
info_lines = [
|
f"状态: {status}",
|
f"阶段: {stage}",
|
f"控制频率: {self.control_freq_hz:5.1f} Hz",
|
"",
|
"位置 (ENU, m):",
|
f" E: {pos_xyz[0]:+7.2f}",
|
f" N: {pos_xyz[1]:+7.2f}",
|
f" U: {pos_xyz[2]:+7.2f}",
|
"",
|
"姿态 (deg):",
|
f" Heading: {heading_deg:+7.2f}",
|
f" Pitch : {pitch:+7.2f}",
|
f" Roll : {roll:+7.2f}",
|
"",
|
f"控制输出: F={forward:+4d}, T={turn:+4d}",
|
]
|
if target_xy:
|
info_lines.append(f"瞄准点: ({target_xy[0]:+.2f}, {target_xy[1]:+.2f})")
|
|
info_str = "\n".join(info_lines)
|
|
if self.gui_bridge:
|
trail_snapshot = list(self.trail_points[-1500:])
|
payload = {
|
"info_str": info_str,
|
"pos_xyz": pos_xyz,
|
"heading_deg": heading_deg,
|
"trail": trail_snapshot,
|
"target": target_xy,
|
"control_freq_hz": self.control_freq_hz,
|
}
|
self.gui_bridge.publish(payload)
|
now = time.time()
|
if now - self._last_gui_publish_log > 1.0:
|
print(f"[GUI] 控制线程推送数据 pos={pos_xyz}, heading={heading_deg:.2f}, trail={len(trail_snapshot)}, target={target_xy}")
|
self._last_gui_publish_log = now
|
|
def _record_control_tick(self, current_time: float):
|
if self._freq_sample_time is None:
|
self._freq_sample_time = current_time
|
self._freq_sample_count = 1
|
return
|
self._freq_sample_count += 1
|
window = current_time - self._freq_sample_time
|
if window >= 1.0:
|
self.control_freq_hz = self._freq_sample_count / max(window, 1e-6)
|
self._freq_sample_count = 0
|
self._freq_sample_time = current_time
|
|
def _apply_origin_geo(self, origin_geo: Optional[str]):
|
origin_geo = (origin_geo or "").strip()
|
self.origin_latlon = None
|
self.origin_ecef = None
|
self.origin_lat_rad = None
|
self.origin_lon_rad = None
|
if not origin_geo:
|
return
|
lat, lon, alt = _parse_origin_geo(origin_geo)
|
self.origin_latlon = (lat, lon)
|
if alt is not None:
|
self.origin_altitude = alt
|
print(f"[INFO] 从原点配置中解析出高度: {self.origin_altitude:.3f} m")
|
print(f"[INFO] 坐标原点设置: 纬度={self.origin_latlon[0]:.8f}°, 经度={self.origin_latlon[1]:.8f}°")
|
|
def set_origin_geo(self, origin_geo: str):
|
"""更新原点坐标配置"""
|
self.origin_geo = origin_geo.strip()
|
self._apply_origin_geo(self.origin_geo)
|
|
def _ensure_origin_reference(self, fallback_altitude: float):
|
"""确保已根据原点经纬度初始化ECEF基准"""
|
if not self.origin_latlon or self.origin_ecef:
|
return
|
lat0, lon0 = self.origin_latlon
|
if self.origin_altitude is None:
|
self.origin_altitude = fallback_altitude
|
print(f"[INFO] 未指定原点高度,使用首个GPS高度 {self.origin_altitude:.3f} m")
|
self.origin_lat_rad = math.radians(lat0)
|
self.origin_lon_rad = math.radians(lon0)
|
self.origin_ecef = _geo_to_ecef(lat0, lon0, self.origin_altitude)
|
print(f"[INFO] 原点ECEF参考初始化完成 (Alt={self.origin_altitude:.3f} m)")
|
|
def _convert_gps_to_local(self, gps: GPSData) -> tuple:
|
"""
|
将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换)
|
|
Returns:
|
(x, y, z, heading_rad, vx, vy)
|
"""
|
# TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标
|
# 这里简化处理,假设GPS已提供相对坐标或使用UTM转换
|
|
# 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针)
|
# 需要转换为数学坐标系 (东为0°逆时针)
|
heading_rad = math.radians(90 - gps.heading_angle)
|
heading_rad = wrap_angle(heading_rad)
|
|
# 速度分量 (GPS提供东北天坐标系)
|
vx = gps.east_velocity # 东向速度 = X轴速度
|
vy = gps.north_velocity # 北向速度 = Y轴速度
|
|
# 位置转换为ENU坐标
|
lat = gps.latitude
|
lon = gps.longitude
|
|
x = 0.0
|
y = 0.0
|
z = 0.0
|
|
if self.origin_latlon:
|
self._ensure_origin_reference(gps.altitude)
|
|
if self.origin_latlon and self.origin_ecef and self.origin_lat_rad is not None and self.origin_lon_rad is not None:
|
current_ecef = _geo_to_ecef(lat, lon, gps.altitude)
|
dx = current_ecef[0] - self.origin_ecef[0]
|
dy = current_ecef[1] - self.origin_ecef[1]
|
dz = current_ecef[2] - self.origin_ecef[2]
|
east, north, up = _ecef_to_enu(dx, dy, dz, self.origin_lat_rad, self.origin_lon_rad)
|
x = east
|
y = north
|
z = up
|
else:
|
x = lon * 111320.0 * math.cos(math.radians(lat)) # 近似米
|
y = lat * 110540.0 # 近似米
|
z = gps.altitude # 粗略使用GPS高度
|
|
# GPS天线 → 车辆中心的平移修正 (ENU坐标系)
|
dx, dy, dz_offset = self.offset_xyz
|
x += dx
|
y += dy
|
z += dz_offset
|
|
return (x, y, z, heading_rad, vx, vy)
|
|
def _control_to_pwm(self, forward: int, turn: int) -> tuple:
|
"""
|
将控制信号转换为PWM值
|
|
Args:
|
forward: 前进信号 (-100 到 100)
|
turn: 转向信号 (-100 到 100, 负为右转, 正为左转)
|
|
Returns:
|
(steering_pwm, throttle_pwm) 单位us
|
"""
|
# 油门: -100~100 映射到 1000~2000us, 中位1500us
|
throttle_pwm = int(1500 + forward * 5.0)
|
|
# 转向: -100~100 映射到 1000~2000us, 中位1500us
|
# 注意: 正值左转(1000), 负值右转(2000)
|
steering_pwm = int(1500 - turn * 5.0)
|
|
return (steering_pwm, throttle_pwm)
|
|
def run(self):
|
"""运行实时控制循环"""
|
if not self.mower_ctrl:
|
print("[ERROR] 请先加载路径文件")
|
return
|
if self._running:
|
print("[WARN] 控制循环已在运行")
|
return
|
if not self.is_connected() or not self.receiver:
|
print("[ERROR] 请先连接串口")
|
return
|
if not self.cmd_sender:
|
print("[ERROR] 控制命令发送器未就绪")
|
return
|
|
self._stop_event.clear()
|
self._running = True
|
|
print("[INFO] 开始实时控制... (按 Ctrl+C 停止)")
|
print("[INFO] 等待GPS数据...")
|
|
receiver = self.receiver
|
try:
|
start_time = time.time()
|
last_stats_time = start_time
|
self.last_log_time = start_time
|
|
# 等待第一个有效的GPS数据
|
gps_ready = False
|
while not gps_ready and not self._stop_event.is_set():
|
gps_data, imu_data = self.receiver.receive_packet()
|
|
if gps_data:
|
if gps_data.position_quality > 0 and gps_data.satellite_count >= 4:
|
self.latest_gps = gps_data
|
gps_ready = True
|
print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
|
else:
|
print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
|
|
if imu_data:
|
self.latest_imu = imu_data
|
|
if self._stop_event.is_set():
|
return
|
|
print("[INFO] 开始控制循环")
|
|
while not self._stop_event.is_set():
|
current_time = time.time()
|
|
# 接收数据 (非阻塞式,快速轮询)
|
gps_data, imu_data = receiver.receive_packet() if receiver else (None, None)
|
|
if gps_data:
|
self.latest_gps = gps_data
|
self.last_gps_time = current_time
|
|
if imu_data:
|
self.latest_imu = imu_data
|
self.last_imu_time = current_time
|
|
# 控制周期检查 (74Hz)
|
if current_time - self.last_control_time >= self.control_period:
|
if self.latest_gps:
|
# 转换GPS数据到本地坐标
|
x, y, z, heading, vx, vy = self._convert_gps_to_local(self.latest_gps)
|
self.trail_points.append((x, y, z))
|
if len(self.trail_points) > 4000:
|
self.trail_points.pop(0)
|
|
# 更新控制器状态 (只用x,y)
|
self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time)
|
|
if self.latest_imu:
|
accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z)
|
gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z)
|
self.mower_ctrl.update_imu(accel, gyro, current_time)
|
|
# 计算控制信号
|
control_output = self.mower_ctrl.compute_control(current_time)
|
control_info = control_output.get('info', {})
|
forward_signal = control_output['forward'] # -100~100
|
turn_signal = control_output['turn'] # -100~100
|
|
# 转换为PWM并发送
|
steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal)
|
self.cmd_sender.send_control_command(steering_pwm, throttle_pwm)
|
|
self.control_count += 1
|
self._record_control_tick(current_time)
|
self.last_control_time = current_time
|
|
gps_heading = self.latest_gps.heading_angle
|
gps_pitch = self.latest_gps.pitch_angle
|
gps_roll = self.latest_gps.roll_angle
|
|
# 打印控制信息 (2Hz)
|
if current_time - self.last_log_time >= 0.5:
|
status = control_info.get('status', 'running')
|
xte = control_info.get('xte', 0)
|
# 数学角度转地理角度:heading_err_deg
|
# 注意 control_output['info']['heading_err'] 是弧度
|
heading_err_deg = math.degrees(control_info.get('heading_err', 0))
|
|
print(f"[LOG] POS_ENU: {x:.3f}, {y:.3f}, {z:.3f} | "
|
f"ATT(H/P/R): {gps_heading:.2f}°, {gps_pitch:.2f}°, {gps_roll:.2f}° | "
|
f"CTRL: S={status}, F={forward_signal}, T={turn_signal}, Err={xte:.2f}m")
|
self.last_log_time = current_time
|
|
stage_label = control_info.get('stage', getattr(self.mower_ctrl, 'stage', ''))
|
target_xy = control_info.get('target_xy')
|
self._update_visualizer(
|
(x, y, z),
|
gps_heading,
|
gps_pitch,
|
gps_roll,
|
forward_signal,
|
turn_signal,
|
control_info.get('status', 'running'),
|
stage_label if stage_label else 'unknown',
|
target_xy,
|
)
|
|
# 检查是否完成
|
if control_info.get('status') == 'finished':
|
print("[INFO] 路径跟踪完成!")
|
self.stop()
|
break
|
|
# 统计信息 (每10秒)
|
# if current_time - last_stats_time > 10.0:
|
# print(f"\n========== 运行统计 ==========")
|
# print(f"运行时间: {current_time - start_time:.1f}s")
|
# print(f"控制次数: {self.control_count}")
|
# print(f"GPS数据包: {self.receiver.gps_count}")
|
# print(f"IMU数据包: {self.receiver.imu_count}")
|
# print(f"命令发送: {self.cmd_sender.get_stats()}")
|
# print(f"错误计数: {self.receiver.error_count}")
|
|
# if self.latest_gps:
|
# print(f"GPS状态: 质量={self.latest_gps.position_quality}, "
|
# f"卫星={self.latest_gps.satellite_count}, "
|
# f"航向={self.latest_gps.heading_angle:.1f}°")
|
|
# print(f"==============================\n")
|
# last_stats_time = current_time
|
|
except KeyboardInterrupt:
|
print("\n[INFO] 用户中断")
|
self.stop()
|
|
finally:
|
self._running = False
|
self._stop_event.clear()
|
# 发送停止命令 (中位值)
|
if self.cmd_sender:
|
print("[INFO] 发送停止命令...")
|
for _ in range(5):
|
self.cmd_sender.send_control_command(1500, 1500)
|
time.sleep(0.02)
|
|
# 打印最终统计
|
print("\n========== 最终统计 ==========")
|
print(f"总控制次数: {self.control_count}")
|
if receiver:
|
print(f"GPS数据包: {receiver.gps_count}")
|
print(f"IMU数据包: {receiver.imu_count}")
|
if self.cmd_sender:
|
print(f"命令发送: {self.cmd_sender.get_stats()}")
|
if receiver:
|
print(f"错误计数: {receiver.error_count}")
|
print("==============================\n")
|
|
|
def main():
|
"""主函数"""
|
BAUDRATE = 921600
|
DEFAULT_PORT = "COM17"
|
DEFAULT_PATH_FILE = "gecaolujing2.txt"
|
DEFAULT_ORIGIN_GGA = "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F"
|
OFFSET_XYZ = (0.0, 0.0, 0.0)
|
|
gui_state = load_gui_state()
|
selected_port = gui_state.get("serial_port") or DEFAULT_PORT
|
origin_geo = gui_state.get("origin_gga") or DEFAULT_ORIGIN_GGA
|
path_file = gui_state.get("path_file")
|
|
gui_bridge = GuiBridge() if QtWidgets else None
|
|
controller = RealtimeController(
|
selected_port,
|
BAUDRATE,
|
offset_xyz=OFFSET_XYZ,
|
origin_geo=origin_geo,
|
origin_alt_m=None,
|
gui_bridge=gui_bridge,
|
)
|
|
# 预加载路径文件(若存在)
|
candidate_path = path_file or DEFAULT_PATH_FILE
|
if candidate_path and Path(candidate_path).exists():
|
controller.load_path(candidate_path)
|
|
def persist_state(updates: Dict[str, str]):
|
gui_state.update(updates)
|
save_gui_state(gui_state)
|
|
if gui_bridge and QtWidgets:
|
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([])
|
dashboard = QtRealtimeDashboard(controller, gui_bridge.queue, gui_state, persist_state)
|
dashboard.show()
|
try:
|
app.exec_()
|
finally:
|
controller.stop()
|
controller.disconnect()
|
else:
|
# 无Qt环境时,沿用旧的命令行工作流
|
if not controller.path_points and Path(DEFAULT_PATH_FILE).exists():
|
controller.load_path(DEFAULT_PATH_FILE)
|
if not controller.connect():
|
print("[ERROR] 串口连接失败,退出")
|
return
|
try:
|
controller.run()
|
finally:
|
controller.disconnect()
|
|
|
if __name__ == "__main__":
|
main()
|