#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 实时闭环控制系统 接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令 """ import serial import struct import time import json import math from typing import Optional, Tuple from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData from mower_controller import MowerController, wrap_angle EARTH_RADIUS_M = 6378137.0 WGS84_A = 6378137.0 # 半长轴 WGS84_F = 1 / 298.257223563 WGS84_E2 = WGS84_F * (2 - WGS84_F) def _dm_to_decimal(dm_value: float) -> float: """ 将度分格式 (DDMM.MMMM) 转为十进制度 """ degrees = int(dm_value // 100) minutes = dm_value - degrees * 100 return degrees + minutes / 60.0 def _parse_origin_geo(origin_geo: str) -> Tuple[float, float, Optional[float]]: """ 解析原点坐标字符串,支持三种格式: 1. 标准格式: "3949.9120,N,11616.8544,E" 2. 带前导逗号: ",3949.9120,N,11616.8544,E" 3. GGA报文: "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" Returns: (lat, lon, alt) 如果原点字符串中不包含高度信息,则 alt 返回 None """ origin_geo = origin_geo.strip() # 判断是否为 GGA 报文 (以 $G 开头,包含 GGA) if origin_geo.startswith('$') and 'GGA' in origin_geo: # 解析 GGA 报文 parts = origin_geo.split(',') if len(parts) < 10: raise ValueError(f"GGA 报文格式错误或不完整: {origin_geo}") # GGA 格式: $xxGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,altUnit... # parts[2]: 纬度 (ddmm.mmmmm) # parts[3]: N/S # parts[4]: 经度 (dddmm.mmmmm) # parts[5]: E/W # parts[9]: 高度 lat_dm_str = parts[2] lat_dir = parts[3] lon_dm_str = parts[4] lon_dir = parts[5] alt_str = parts[9] if not (lat_dm_str and lat_dir and lon_dm_str and lon_dir and alt_str): raise ValueError("GGA 报文缺少必要的坐标或高度信息") lat_dm = float(lat_dm_str) lon_dm = float(lon_dm_str) alt = float(alt_str) lat = _dm_to_decimal(lat_dm) lon = _dm_to_decimal(lon_dm) if lat_dir.upper() == 'S': lat = -lat if lon_dir.upper() == 'W': lon = -lon return lat, lon, alt else: # 旧的简略格式处理 if origin_geo.startswith(','): origin_geo = origin_geo[1:] parts = [p.strip() for p in origin_geo.split(',')] parts = [p for p in parts if p] if len(parts) != 4: raise ValueError("原点坐标格式应为 '纬度度分,N/S,经度度分,E/W' 或 GGA 报文") try: lat_dm = float(parts[0]) lon_dm = float(parts[2]) except ValueError as exc: raise ValueError("度分字段必须是数字") from exc lat = _dm_to_decimal(lat_dm) lon = _dm_to_decimal(lon_dm) lat_dir = parts[1].upper() lon_dir = parts[3].upper() if lat_dir not in ("N", "S"): raise ValueError("纬度方向必须为 N 或 S") if lon_dir not in ("E", "W"): raise ValueError("经度方向必须为 E 或 W") if lat_dir == "S": lat = -lat if lon_dir == "W": lon = -lon return lat, lon, None def _geo_to_ecef(lat_deg: float, lon_deg: float, alt_m: float) -> Tuple[float, float, float]: """将经纬度/高度转换为ECEF坐标""" lat_rad = math.radians(lat_deg) lon_rad = math.radians(lon_deg) sin_lat = math.sin(lat_rad) cos_lat = math.cos(lat_rad) sin_lon = math.sin(lon_rad) cos_lon = math.cos(lon_rad) N = WGS84_A / math.sqrt(1 - WGS84_E2 * sin_lat * sin_lat) x = (N + alt_m) * cos_lat * cos_lon y = (N + alt_m) * cos_lat * sin_lon z = (N * (1 - WGS84_E2) + alt_m) * sin_lat return x, y, z def _ecef_to_enu(dx: float, dy: float, dz: float, lat0_rad: float, lon0_rad: float) -> Tuple[float, float, float]: """将ECEF差值转换为ENU坐标""" sin_lat = math.sin(lat0_rad) cos_lat = math.cos(lat0_rad) sin_lon = math.sin(lon0_rad) cos_lon = math.cos(lon0_rad) east = -sin_lon * dx + cos_lon * dy north = -sin_lat * cos_lon * dx - sin_lat * sin_lon * dy + cos_lat * dz up = cos_lat * cos_lon * dx + cos_lat * sin_lon * dy + sin_lat * dz return east, north, up class ControlCommandSender: """控制命令发送器 - 发送转向和油门控制信号给STM32""" # 协议常量 FRAME_HEADER1 = 0xAA FRAME_HEADER2 = 0x55 TYPE_CONTROL = 0x10 # 控制命令类型 FRAME_FOOTER1 = 0x0D FRAME_FOOTER2 = 0x0A def __init__(self, serial_port: serial.Serial): """ 初始化控制命令发送器 Args: serial_port: 已打开的串口对象 """ self.serial = serial_port self.send_count = 0 def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool: """ 发送控制命令到STM32 协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A Args: steering_pwm: 转向PWM值 (1000-2000us) throttle_pwm: 油门PWM值 (1000-2000us) Returns: 发送成功返回True """ try: # 限制PWM范围 steering_pwm = max(1000, min(2000, steering_pwm)) throttle_pwm = max(1000, min(2000, throttle_pwm)) # 构建数据包 # 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B) data_len = 4 # 2字节转向 + 2字节油门 payload = struct.pack(' bool: """ 加载路径文件并初始化控制器 支持 .json 和 .txt (X,Y;X,Y...) 格式 Args: path_file: 路径文件路径 Returns: 成功返回True """ try: with open(path_file, 'r', encoding='utf-8') as f: content = f.read().strip() path_points = [] # 尝试解析 JSON try: if path_file.endswith('.json') or content.startswith('['): path_points = json.loads(content) except json.JSONDecodeError: # 假如不是JSON,尝试解析 TXT 格式 pass # 如果 JSON 解析未成功或未尝试,则尝试 TXT 解析 if not path_points: # TXT 格式: X,Y;X,Y;... points_str = content.split(';') for p_str in points_str: if not p_str.strip(): continue try: parts = p_str.split(',') if len(parts) >= 2: x = float(parts[0]) y = float(parts[1]) path_points.append([x, y]) except ValueError: print(f"[WARN] 忽略无效点: {p_str}") if not path_points: print("[ERROR] 未能解析出有效路径点") return False print(f"[INFO] 加载路径: {len(path_points)} 个点") # 使用sim_offline.py中的高精度参数 params = { # 核心控制增益 'k_heading': 2.0, 'k_xte': 0.8, 'k_heading_d': 0.1, 'k_heading_i': 0.015, 'k_xte_i': 0.02, # 速度控制 'base_speed': 0.8, 'max_forward_mps': 1.2, 'min_follow_speed': 0.40, 'max_reverse_mps': 0.25, 'max_yawrate': math.pi/2.5, # 前视距离 'lookahead_min': 0.35, 'lookahead_max': 1.2, 'lookahead_time': 2.0, # 误差容忍度 'max_xte': 1.5, 'goal_tolerance': 0.1, 'large_err_threshold': 0.5, # 滤波参数 'yawrate_alpha': 0.85, # 速度缩放增益 'curv_gain_scale': 2.0, 'heading_err_gain_scale': 1.5, 'xte_gain_scale': 1.0, # 终点减速区域 'slow_zone': 1.5, 'final_approach_dist': 0.8, } self.mower_ctrl = MowerController(path_points, params=params) print("[INFO] 运动控制器初始化完成") return True except Exception as e: print(f"[ERROR] 加载路径失败: {e}") return False def connect(self) -> bool: """连接串口""" if not self.receiver.connect(): return False # 创建控制命令发送器 (共用串口对象) self.cmd_sender = ControlCommandSender(self.receiver.serial) print("[INFO] 控制命令发送器就绪") return True def disconnect(self): """断开连接""" self.receiver.disconnect() def _ensure_origin_reference(self, fallback_altitude: float): """确保已根据原点经纬度初始化ECEF基准""" if not self.origin_latlon or self.origin_ecef: return lat0, lon0 = self.origin_latlon if self.origin_altitude is None: self.origin_altitude = fallback_altitude print(f"[INFO] 未指定原点高度,使用首个GPS高度 {self.origin_altitude:.3f} m") self.origin_lat_rad = math.radians(lat0) self.origin_lon_rad = math.radians(lon0) self.origin_ecef = _geo_to_ecef(lat0, lon0, self.origin_altitude) print(f"[INFO] 原点ECEF参考初始化完成 (Alt={self.origin_altitude:.3f} m)") def _convert_gps_to_local(self, gps: GPSData) -> tuple: """ 将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换) Returns: (x, y, z, heading_rad, vx, vy) """ # TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标 # 这里简化处理,假设GPS已提供相对坐标或使用UTM转换 # 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针) # 需要转换为数学坐标系 (东为0°逆时针) heading_rad = math.radians(90 - gps.heading_angle) heading_rad = wrap_angle(heading_rad) # 速度分量 (GPS提供东北天坐标系) vx = gps.east_velocity # 东向速度 = X轴速度 vy = gps.north_velocity # 北向速度 = Y轴速度 # 位置转换为ENU坐标 lat = gps.latitude lon = gps.longitude x = 0.0 y = 0.0 z = 0.0 if self.origin_latlon: self._ensure_origin_reference(gps.altitude) if self.origin_latlon and self.origin_ecef and self.origin_lat_rad is not None and self.origin_lon_rad is not None: current_ecef = _geo_to_ecef(lat, lon, gps.altitude) dx = current_ecef[0] - self.origin_ecef[0] dy = current_ecef[1] - self.origin_ecef[1] dz = current_ecef[2] - self.origin_ecef[2] east, north, up = _ecef_to_enu(dx, dy, dz, self.origin_lat_rad, self.origin_lon_rad) x = east y = north z = up else: x = lon * 111320.0 * math.cos(math.radians(lat)) # 近似米 y = lat * 110540.0 # 近似米 z = gps.altitude # 粗略使用GPS高度 # GPS天线 → 车辆中心的平移修正 (ENU坐标系) dx, dy, dz_offset = self.offset_xyz x += dx y += dy z += dz_offset return (x, y, z, heading_rad, vx, vy) def _control_to_pwm(self, forward: int, turn: int) -> tuple: """ 将控制信号转换为PWM值 Args: forward: 前进信号 (-100 到 100) turn: 转向信号 (-100 到 100, 负为右转, 正为左转) Returns: (steering_pwm, throttle_pwm) 单位us """ # 油门: -100~100 映射到 1000~2000us, 中位1500us throttle_pwm = int(1500 + forward * 5.0) # 转向: -100~100 映射到 1000~2000us, 中位1500us # 注意: 正值左转(1000), 负值右转(2000) steering_pwm = int(1500 - turn * 5.0) return (steering_pwm, throttle_pwm) def run(self): """运行实时控制循环""" if not self.mower_ctrl: print("[ERROR] 请先加载路径文件") return print("[INFO] 开始实时控制... (按 Ctrl+C 停止)") print("[INFO] 等待GPS数据...") try: start_time = time.time() last_stats_time = start_time # 等待第一个有效的GPS数据 gps_ready = False while not gps_ready: gps_data, imu_data = self.receiver.receive_packet() if gps_data: if gps_data.position_quality > 0 and gps_data.satellite_count >= 4: self.latest_gps = gps_data gps_ready = True print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") else: print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}") if imu_data: self.latest_imu = imu_data print("[INFO] 开始控制循环") while True: current_time = time.time() # 接收数据 (非阻塞式,快速轮询) gps_data, imu_data = self.receiver.receive_packet() if gps_data: self.latest_gps = gps_data self.last_gps_time = current_time if imu_data: self.latest_imu = imu_data self.last_imu_time = current_time # 控制周期检查 (74Hz) if current_time - self.last_control_time >= self.control_period: if self.latest_gps: # 转换GPS数据到本地坐标 x, y, z, heading, vx, vy = self._convert_gps_to_local(self.latest_gps) # 更新控制器状态 (只用x,y) self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time) if self.latest_imu: accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z) gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z) self.mower_ctrl.update_imu(accel, gyro, current_time) # 计算控制信号 control_output = self.mower_ctrl.compute_control(current_time) forward_signal = control_output['forward'] # -100~100 turn_signal = control_output['turn'] # -100~100 # 转换为PWM并发送 steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal) self.cmd_sender.send_control_command(steering_pwm, throttle_pwm) self.control_count += 1 self.last_control_time = current_time # 打印控制信息 (降低频率) if self.control_count % 74 == 0: # 每秒打印一次 status = control_output['info'].get('status', 'running') xte = control_output['info'].get('xte', 0) # 数学角度转地理角度:heading_err_deg # 注意 control_output['info']['heading_err'] 是弧度 heading_err_deg = math.degrees(control_output['info'].get('heading_err', 0)) # 获取最新的姿态角 (优先使用 GPS 数据,IMU 作为辅助或高频补充,这里直接用 GPS 结构体中的角度) # GPSData 中: heading_angle(0-360), pitch_angle, roll_angle # 都是角度制 gps_heading = self.latest_gps.heading_angle gps_pitch = self.latest_gps.pitch_angle gps_roll = self.latest_gps.roll_angle print(f"[LOG] POS_ENU: {x:.3f}, {y:.3f}, {z:.3f} | " f"ATT(H/P/R): {gps_heading:.2f}°, {gps_pitch:.2f}°, {gps_roll:.2f}° | " f"CTRL: S={status}, F={forward_signal}, T={turn_signal}, Err={xte:.2f}m") # 检查是否完成 if control_output['info'].get('status') == 'finished': print("[INFO] 路径跟踪完成!") break # 统计信息 (每10秒) # if current_time - last_stats_time > 10.0: # print(f"\n========== 运行统计 ==========") # print(f"运行时间: {current_time - start_time:.1f}s") # print(f"控制次数: {self.control_count}") # print(f"GPS数据包: {self.receiver.gps_count}") # print(f"IMU数据包: {self.receiver.imu_count}") # print(f"命令发送: {self.cmd_sender.get_stats()}") # print(f"错误计数: {self.receiver.error_count}") # if self.latest_gps: # print(f"GPS状态: 质量={self.latest_gps.position_quality}, " # f"卫星={self.latest_gps.satellite_count}, " # f"航向={self.latest_gps.heading_angle:.1f}°") # print(f"==============================\n") # last_stats_time = current_time except KeyboardInterrupt: print("\n[INFO] 用户中断") finally: # 发送停止命令 (中位值) print("[INFO] 发送停止命令...") for _ in range(5): self.cmd_sender.send_control_command(1500, 1500) time.sleep(0.02) # 打印最终统计 print("\n========== 最终统计 ==========") print(f"总控制次数: {self.control_count}") print(f"GPS数据包: {self.receiver.gps_count}") print(f"IMU数据包: {self.receiver.imu_count}") print(f"命令发送: {self.cmd_sender.get_stats()}") print(f"错误计数: {self.receiver.error_count}") print("==============================\n") def main(): """主函数""" # 配置 PORT = "COM17" # 根据实际情况修改 BAUDRATE = 921600 PATH_FILE = "gecaolujing2.txt" # 路径文件 OFFSET_XYZ = (0.0, 0.0, 0.0) # 车辆中心相对GPS天线的ENU偏移(米): (东, 北, 天) # 原点经纬度 (支持 GGA 报文或简略格式) # 示例GGA: "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" ORIGIN_GEO = "$GNGGA,060956.700,3949.8890014,N,11616.7555551,E,4,20,0.68,46.621,M,-6.679,M,1.0,0409*7F" ORIGIN_ALT = None # 原点海拔(米),可选。若GGA报文中包含海拔,会自动使用报文中的海拔。 # 创建控制器 controller = RealtimeController( PORT, BAUDRATE, offset_xyz=OFFSET_XYZ, origin_geo=ORIGIN_GEO, origin_alt_m=ORIGIN_ALT ) # 加载路径 if not controller.load_path(PATH_FILE): print("[ERROR] 路径加载失败,退出") return # 连接串口 if not controller.connect(): print("[ERROR] 串口连接失败,退出") return # 运行控制循环 try: controller.run() finally: controller.disconnect() if __name__ == "__main__": main()