| | |
| | | info_scroll.setMinimumWidth(320) |
| | | right_layout.addWidget(info_scroll, stretch=1) |
| | | |
| | | self.path_browse_btn.clicked.connect(self._browse_path_file) |
| | | self.path_load_btn.clicked.connect(self._load_path_from_ui) |
| | | self.origin_update_btn.clicked.connect(self._update_origin_from_ui) |
| | | self.port_refresh_btn.clicked.connect(lambda: self._refresh_serial_ports(initial=False)) |
| | | self.serial_toggle_btn.clicked.connect(self._toggle_serial_connection) |
| | | self.start_button.clicked.connect(self._start_control) |
| | | self.stop_button.clicked.connect(self._stop_control) |
| | | if self.controller is not None: |
| | | self.path_browse_btn.clicked.connect(self._browse_path_file) |
| | | self.path_load_btn.clicked.connect(self._load_path_from_ui) |
| | | self.origin_update_btn.clicked.connect(self._update_origin_from_ui) |
| | | self.port_refresh_btn.clicked.connect(lambda: self._refresh_serial_ports(initial=False)) |
| | | self.serial_toggle_btn.clicked.connect(self._toggle_serial_connection) |
| | | self.start_button.clicked.connect(self._start_control) |
| | | self.stop_button.clicked.connect(self._stop_control) |
| | | |
| | | def _apply_saved_values(self): |
| | | path_file = self.gui_state.get("path_file", "") |
| | |
| | | self.port_combo.blockSignals(False) |
| | | |
| | | def _toggle_serial_connection(self): |
| | | if self.controller.is_connected(): |
| | | if self.controller.is_running(): |
| | | QtWidgets.QMessageBox.warning(self, "串口", "请先停止运动控制,再关闭串口。") |
| | | return |
| | | self.controller.disconnect() |
| | | QtWidgets.QMessageBox.information(self, "串口", "串口已关闭。") |
| | | else: |
| | | port = self.port_combo.currentText().strip() |
| | | if not port: |
| | | QtWidgets.QMessageBox.warning(self, "串口", "没有可用串口,请先连接设备。") |
| | | return |
| | | try: |
| | | self.controller.set_port(port) |
| | | except RuntimeError as exc: |
| | | QtWidgets.QMessageBox.warning(self, "串口", str(exc)) |
| | | return |
| | | if not self.controller.connect(): |
| | | QtWidgets.QMessageBox.warning(self, "串口", "串口打开失败,请检查连接。") |
| | | return |
| | | self._save_state({"serial_port": port}) |
| | | QtWidgets.QMessageBox.information(self, "串口", f"串口 {port} 已打开。") |
| | | self._update_serial_ui() |
| | | self._update_control_buttons() |
| | | QtWidgets.QMessageBox.warning(self, "串口", "HITL 模式下该操作由仿真器控制") |
| | | |
| | | def _start_control(self): |
| | | if self._is_control_running(): |
| | | return |
| | | if not self.controller.mower_ctrl: |
| | | QtWidgets.QMessageBox.warning(self, "控制", "请先加载路径文件。") |
| | | return |
| | | if not self.controller.is_connected(): |
| | | QtWidgets.QMessageBox.warning(self, "控制", "请先打开串口。") |
| | | return |
| | | |
| | | self.start_button.setEnabled(False) |
| | | self.stop_button.setEnabled(True) |
| | | |
| | | def control_entry(): |
| | | try: |
| | | self.controller.run() |
| | | except Exception as exc: |
| | | print(f"[ERROR] 控制线程异常: {exc}") |
| | | QtCore.QMetaObject.invokeMethod( |
| | | self, |
| | | "_show_error_message", |
| | | QtCore.Qt.QueuedConnection, |
| | | QtCore.Q_ARG(str, f"控制线程异常: {exc}"), |
| | | ) |
| | | finally: |
| | | QtCore.QMetaObject.invokeMethod(self, "_on_control_finished", QtCore.Qt.QueuedConnection) |
| | | |
| | | self.control_thread = threading.Thread(target=control_entry, daemon=True) |
| | | self.control_thread.start() |
| | | self._update_control_buttons() |
| | | QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器启动") |
| | | |
| | | def _stop_control(self): |
| | | if not self._is_control_running(): |
| | | return |
| | | self.controller.stop() |
| | | self.stop_button.setEnabled(False) |
| | | QtWidgets.QMessageBox.warning(self, "控制", "HITL 模式下请通过仿真器停止") |
| | | |
| | | def _is_control_running(self) -> bool: |
| | | return bool(self.control_thread and self.control_thread.is_alive()) |
| | | return False |
| | | |
| | | def _save_state(self, updates: Dict[str, str]): |
| | | self.gui_state.update(updates) |
| | |
| | | self.persist_state_cb(updates) |
| | | |
| | | def _update_serial_ui(self): |
| | | connected = self.controller.is_connected() |
| | | self.serial_toggle_btn.setText("关闭串口" if connected else "打开串口") |
| | | self.port_combo.setEnabled(not connected) |
| | | self.port_refresh_btn.setEnabled(not connected) |
| | | self.serial_toggle_btn.setEnabled(False) |
| | | self.port_combo.setEnabled(False) |
| | | self.port_refresh_btn.setEnabled(False) |
| | | |
| | | def _update_control_buttons(self): |
| | | can_start = bool(self.controller.mower_ctrl and self.controller.is_connected() and not self._is_control_running()) |
| | | self.start_button.setEnabled(can_start) |
| | | self.stop_button.setEnabled(self._is_control_running()) |
| | | self.start_button.setEnabled(False) |
| | | self.stop_button.setEnabled(False) |
| | | |
| | | @QtCore.pyqtSlot() |
| | | def _on_control_finished(self): |
| | | self.control_thread = None |
| | | self._update_control_buttons() |
| | | self.stop_button.setEnabled(False) |
| | | pass |
| | | |
| | | @QtCore.pyqtSlot(str) |
| | | def _show_error_message(self, message: str): |
| | | QtWidgets.QMessageBox.warning(self, "错误", message) |
| | | |
| | | def closeEvent(self, event): |
| | | self._stop_control() |
| | | if self.control_thread: |
| | | self.control_thread.join(timeout=3.0) |
| | | if self.controller.is_connected(): |
| | | self.controller.disconnect() |
| | | super().closeEvent(event) |
| | | else: |
| | | QtRealtimeDashboard = None |
| | |
| | | def disconnect(self): |
| | | """断开连接""" |
| | | if self.receiver: |
| | | self.receiver.disconnect() |
| | | self.receiver.disconnect() |
| | | self.receiver = None |
| | | self.cmd_sender = None |
| | | |
| | |
| | | |
| | | if self._stop_event.is_set(): |
| | | return |
| | | |
| | | |
| | | print("[INFO] 开始控制循环") |
| | | |
| | | while not self._stop_event.is_set(): |
| | |
| | | # 数学角度转地理角度:heading_err_deg |
| | | # 注意 control_output['info']['heading_err'] 是弧度 |
| | | heading_err_deg = math.degrees(control_info.get('heading_err', 0)) |
| | | |
| | | |
| | | print(f"[LOG] POS_ENU: {x:.3f}, {y:.3f}, {z:.3f} | " |
| | | f"ATT(H/P/R): {gps_heading:.2f}°, {gps_pitch:.2f}°, {gps_roll:.2f}° | " |
| | | f"CTRL: S={status}, F={forward_signal}, T={turn_signal}, Err={xte:.2f}m") |
| | |
| | | self._stop_event.clear() |
| | | # 发送停止命令 (中位值) |
| | | if self.cmd_sender: |
| | | print("[INFO] 发送停止命令...") |
| | | for _ in range(5): |
| | | self.cmd_sender.send_control_command(1500, 1500) |
| | | time.sleep(0.02) |
| | | print("[INFO] 发送停止命令...") |
| | | for _ in range(5): |
| | | self.cmd_sender.send_control_command(1500, 1500) |
| | | time.sleep(0.02) |
| | | |
| | | # 打印最终统计 |
| | | print("\n========== 最终统计 ==========") |
| | |
| | | print(f"GPS数据包: {receiver.gps_count}") |
| | | print(f"IMU数据包: {receiver.imu_count}") |
| | | if self.cmd_sender: |
| | | print(f"命令发送: {self.cmd_sender.get_stats()}") |
| | | print(f"命令发送: {self.cmd_sender.get_stats()}") |
| | | if receiver: |
| | | print(f"错误计数: {receiver.error_count}") |
| | | print("==============================\n") |
| | |
| | | origin_alt_m=None, |
| | | gui_bridge=gui_bridge, |
| | | ) |
| | | |
| | | |
| | | # 预加载路径文件(若存在) |
| | | candidate_path = path_file or DEFAULT_PATH_FILE |
| | | if candidate_path and Path(candidate_path).exists(): |
| | |
| | | # 无Qt环境时,沿用旧的命令行工作流 |
| | | if not controller.path_points and Path(DEFAULT_PATH_FILE).exists(): |
| | | controller.load_path(DEFAULT_PATH_FILE) |
| | | if not controller.connect(): |
| | | print("[ERROR] 串口连接失败,退出") |
| | | return |
| | | try: |
| | | controller.run() |
| | | finally: |
| | | controller.disconnect() |
| | | if not controller.connect(): |
| | | print("[ERROR] 串口连接失败,退出") |
| | | return |
| | | try: |
| | | controller.run() |
| | | finally: |
| | | controller.disconnect() |
| | | |
| | | |
| | | if __name__ == "__main__": |