#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 实时闭环控制系统 接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令 """ import serial import struct import time import json import math from typing import Optional from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData from mower_controller import MowerController, wrap_angle class ControlCommandSender: """控制命令发送器 - 发送转向和油门控制信号给STM32""" # 协议常量 FRAME_HEADER1 = 0xAA FRAME_HEADER2 = 0x55 TYPE_CONTROL = 0x10 # 控制命令类型 FRAME_FOOTER1 = 0x0D FRAME_FOOTER2 = 0x0A def __init__(self, serial_port: serial.Serial): """ 初始化控制命令发送器 Args: serial_port: 已打开的串口对象 """ self.serial = serial_port self.send_count = 0 def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool: """ 发送控制命令到STM32 协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A Args: steering_pwm: 转向PWM值 (1000-2000us) throttle_pwm: 油门PWM值 (1000-2000us) Returns: 发送成功返回True """ try: # 限制PWM范围 steering_pwm = max(1000, min(2000, steering_pwm)) throttle_pwm = max(1000, min(2000, throttle_pwm)) # 构建数据包 # 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B) data_len = 4 # 2字节转向 + 2字节油门 payload = struct.pack(' bool: """ 加载路径文件并初始化控制器 Args: path_file: JSON路径文件 Returns: 成功返回True """ try: with open(path_file, 'r') as f: path_points = json.load(f) print(f"[INFO] 加载路径: {len(path_points)} 个点") # 使用sim_offline.py中的高精度参数 params = { # 核心控制增益 'k_heading': 2.0, 'k_xte': 0.8, 'k_heading_d': 0.1, 'k_heading_i': 0.015, 'k_xte_i': 0.02, # 速度控制 'base_speed': 0.8, 'max_forward_mps': 1.2, 'min_follow_speed': 0.40, 'max_reverse_mps': 0.25, 'max_yawrate': math.pi/2.5, # 前视距离 'lookahead_min': 0.35, 'lookahead_max': 1.2, 'lookahead_time': 2.0, # 误差容忍度 'max_xte': 1.5, 'goal_tolerance': 0.1, 'large_err_threshold': 0.5, # 滤波参数 'yawrate_alpha': 0.85, # 速度缩放增益 'curv_gain_scale': 2.0, 'heading_err_gain_scale': 1.5, 'xte_gain_scale': 1.0, # 终点减速区域 'slow_zone': 1.5, 'final_approach_dist': 0.8, } self.mower_ctrl = MowerController(path_points, params=params) print("[INFO] 运动控制器初始化完成") return True except Exception as e: print(f"[ERROR] 加载路径失败: {e}") return False def connect(self) -> bool: """连接串口""" if not self.receiver.connect(): return False # 创建控制命令发送器 (共用串口对象) self.cmd_sender = ControlCommandSender(self.receiver.serial) print("[INFO] 控制命令发送器就绪") return True def disconnect(self): """断开连接""" self.receiver.disconnect() def _convert_gps_to_local(self, gps: GPSData) -> tuple: """ 将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换) Returns: (x, y, heading_rad, vx, vy) """ # TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标 # 这里简化处理,假设GPS已提供相对坐标或使用UTM转换 # 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针) # 需要转换为数学坐标系 (东为0°逆时针) heading_rad = math.radians(90 - gps.heading_angle) heading_rad = wrap_angle(heading_rad) # 速度分量 (GPS提供东北天坐标系) vx = gps.east_velocity # 东向速度 = X轴速度 vy = gps.north_velocity # 北向速度 = Y轴速度 # 位置 (简化,实际需要坐标转换) x = gps.longitude * 111320.0 * math.cos(math.radians(gps.latitude)) # 近似米 y = gps.latitude * 110540.0 # 近似米 return (x, y, heading_rad, vx, vy) def _control_to_pwm(self, forward: int, turn: int) -> tuple: """ 将控制信号转换为PWM值 Args: forward: 前进信号 (-100 到 100) turn: 转向信号 (-100 到 100, 负为右转, 正为左转) Returns: (steering_pwm, throttle_pwm) 单位us """ # 油门: -100~100 映射到 1000~2000us, 中位1500us throttle_pwm = int(1500 + forward * 5.0) # 转向: -100~100 映射到 1000~2000us, 中位1500us # 注意: 正值左转(1000), 负值右转(2000) steering_pwm = int(1500 - turn * 5.0) return (steering_pwm, throttle_pwm) def run(self): """运行实时控制循环""" if not self.mower_ctrl: print("[ERROR] 请先加载路径文件") return print("[INFO] 开始实时控制... (按 Ctrl+C 停止)") print("[INFO] 等待GPS数据...") try: start_time = time.time() last_stats_time = start_time # 等待第一个有效的GPS数据 gps_ready = False while not gps_ready: gps_data, imu_data = self.receiver.receive_packet() if gps_data: if gps_data.position_quality > 0 and gps_data.satellite_count >= 4: self.latest_gps = gps_data gps_ready = True print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") else: print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") if imu_data: self.latest_imu = imu_data print("[INFO] 开始控制循环") while True: current_time = time.time() # 接收数据 (非阻塞式,快速轮询) gps_data, imu_data = self.receiver.receive_packet() if gps_data: self.latest_gps = gps_data self.last_gps_time = current_time if imu_data: self.latest_imu = imu_data self.last_imu_time = current_time # 控制周期检查 (74Hz) if current_time - self.last_control_time >= self.control_period: if self.latest_gps: # 转换GPS数据到本地坐标 x, y, heading, vx, vy = self._convert_gps_to_local(self.latest_gps) # 更新控制器状态 self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time) if self.latest_imu: accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z) gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z) self.mower_ctrl.update_imu(accel, gyro, current_time) # 计算控制信号 control_output = self.mower_ctrl.compute_control(current_time) forward_signal = control_output['forward'] # -100~100 turn_signal = control_output['turn'] # -100~100 # 转换为PWM并发送 steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal) self.cmd_sender.send_control_command(steering_pwm, throttle_pwm) self.control_count += 1 self.last_control_time = current_time # 打印控制信息 (降低频率) if self.control_count % 74 == 0: # 每秒打印一次 status = control_output['info'].get('status', 'running') xte = control_output['info'].get('xte', 0) heading_err_deg = math.degrees(control_output['info'].get('heading_err', 0)) print(f"[CTRL] 状态: {status}, " f"前进: {forward_signal:+4d}, 转向: {turn_signal:+4d}, " f"PWM: S={steering_pwm}, T={throttle_pwm}, " f"横向误差: {xte:.2f}m, 航向误差: {heading_err_deg:+.1f}°") # 检查是否完成 if control_output['info'].get('status') == 'finished': print("[INFO] 路径跟踪完成!") break # 统计信息 (每10秒) if current_time - last_stats_time > 10.0: print(f"\n========== 运行统计 ==========") print(f"运行时间: {current_time - start_time:.1f}s") print(f"控制次数: {self.control_count}") print(f"GPS数据包: {self.receiver.gps_count}") print(f"IMU数据包: {self.receiver.imu_count}") print(f"命令发送: {self.cmd_sender.get_stats()}") print(f"错误计数: {self.receiver.error_count}") if self.latest_gps: print(f"GPS状态: 质量={self.latest_gps.position_quality}, " f"卫星={self.latest_gps.satellite_count}, " f"航向={self.latest_gps.heading_angle:.1f}°") print(f"==============================\n") last_stats_time = current_time except KeyboardInterrupt: print("\n[INFO] 用户中断") finally: # 发送停止命令 (中位值) print("[INFO] 发送停止命令...") for _ in range(5): self.cmd_sender.send_control_command(1500, 1500) time.sleep(0.02) # 打印最终统计 print("\n========== 最终统计 ==========") print(f"总控制次数: {self.control_count}") print(f"GPS数据包: {self.receiver.gps_count}") print(f"IMU数据包: {self.receiver.imu_count}") print(f"命令发送: {self.cmd_sender.get_stats()}") print(f"错误计数: {self.receiver.error_count}") print("==============================\n") def main(): """主函数""" # 配置 PORT = "COM17" # 根据实际情况修改 BAUDRATE = 921600 PATH_FILE = "example_path.json" # 路径文件 # 创建控制器 controller = RealtimeController(PORT, BAUDRATE) # 加载路径 if not controller.load_path(PATH_FILE): print("[ERROR] 路径加载失败,退出") return # 连接串口 if not controller.connect(): print("[ERROR] 串口连接失败,退出") return # 运行控制循环 try: controller.run() finally: controller.disconnect() if __name__ == "__main__": main()