#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
实时闭环控制系统
|
接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令
|
"""
|
|
import serial
|
import struct
|
import time
|
import json
|
import math
|
from typing import Optional
|
from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData
|
from mower_controller import MowerController, wrap_angle
|
|
|
class ControlCommandSender:
|
"""控制命令发送器 - 发送转向和油门控制信号给STM32"""
|
|
# 协议常量
|
FRAME_HEADER1 = 0xAA
|
FRAME_HEADER2 = 0x55
|
TYPE_CONTROL = 0x10 # 控制命令类型
|
FRAME_FOOTER1 = 0x0D
|
FRAME_FOOTER2 = 0x0A
|
|
def __init__(self, serial_port: serial.Serial):
|
"""
|
初始化控制命令发送器
|
|
Args:
|
serial_port: 已打开的串口对象
|
"""
|
self.serial = serial_port
|
self.send_count = 0
|
|
def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool:
|
"""
|
发送控制命令到STM32
|
|
协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A
|
|
Args:
|
steering_pwm: 转向PWM值 (1000-2000us)
|
throttle_pwm: 油门PWM值 (1000-2000us)
|
|
Returns:
|
发送成功返回True
|
"""
|
try:
|
# 限制PWM范围
|
steering_pwm = max(1000, min(2000, steering_pwm))
|
throttle_pwm = max(1000, min(2000, throttle_pwm))
|
|
# 构建数据包
|
# 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B)
|
data_len = 4 # 2字节转向 + 2字节油门
|
|
payload = struct.pack('<HH', steering_pwm, throttle_pwm)
|
|
# 构建完整帧用于计算校验和
|
checksum_data = (
|
bytes([self.TYPE_CONTROL]) +
|
struct.pack('<H', data_len) +
|
payload
|
)
|
|
# 计算16位累加和校验
|
checksum = sum(checksum_data) & 0xFFFF
|
|
# 完整数据包
|
packet = (
|
bytes([self.FRAME_HEADER1, self.FRAME_HEADER2]) +
|
checksum_data +
|
struct.pack('<H', checksum) +
|
bytes([self.FRAME_FOOTER1, self.FRAME_FOOTER2])
|
)
|
|
# 发送
|
self.serial.write(packet)
|
self.serial.flush()
|
|
self.send_count += 1
|
return True
|
|
except Exception as e:
|
print(f"[ERROR] 控制命令发送失败: {e}")
|
return False
|
|
def get_stats(self):
|
"""获取发送统计"""
|
return self.send_count
|
|
|
class RealtimeController:
|
"""实时控制器 - 整合GPS/IMU接收、控制算法和命令发送"""
|
|
def __init__(self, port: str, baudrate: int = 921600):
|
"""
|
初始化实时控制器
|
|
Args:
|
port: 串口设备名
|
baudrate: 波特率
|
"""
|
self.port = port
|
self.baudrate = baudrate
|
|
# GPS/IMU接收器
|
self.receiver = GPSIMUReceiver(port, baudrate)
|
|
# 控制命令发送器 (共用同一个串口)
|
self.cmd_sender = None
|
|
# 运动控制器
|
self.mower_ctrl = None
|
|
# 状态变量
|
self.last_gps_time = 0
|
self.last_imu_time = 0
|
self.control_period = 1.0 / 74.0 # 74Hz控制频率
|
self.last_control_time = 0
|
|
# 数据缓存
|
self.latest_gps: Optional[GPSData] = None
|
self.latest_imu: Optional[IMUData] = None
|
|
# 统计
|
self.control_count = 0
|
|
def load_path(self, path_file: str) -> bool:
|
"""
|
加载路径文件并初始化控制器
|
|
Args:
|
path_file: JSON路径文件
|
|
Returns:
|
成功返回True
|
"""
|
try:
|
with open(path_file, 'r') as f:
|
path_points = json.load(f)
|
|
print(f"[INFO] 加载路径: {len(path_points)} 个点")
|
|
# 使用sim_offline.py中的高精度参数
|
params = {
|
# 核心控制增益
|
'k_heading': 2.0,
|
'k_xte': 0.8,
|
'k_heading_d': 0.1,
|
'k_heading_i': 0.015,
|
'k_xte_i': 0.02,
|
|
# 速度控制
|
'base_speed': 0.8,
|
'max_forward_mps': 1.2,
|
'min_follow_speed': 0.40,
|
'max_reverse_mps': 0.25,
|
'max_yawrate': math.pi/2.5,
|
|
# 前视距离
|
'lookahead_min': 0.35,
|
'lookahead_max': 1.2,
|
'lookahead_time': 2.0,
|
|
# 误差容忍度
|
'max_xte': 1.5,
|
'goal_tolerance': 0.1,
|
'large_err_threshold': 0.5,
|
|
# 滤波参数
|
'yawrate_alpha': 0.85,
|
|
# 速度缩放增益
|
'curv_gain_scale': 2.0,
|
'heading_err_gain_scale': 1.5,
|
'xte_gain_scale': 1.0,
|
|
# 终点减速区域
|
'slow_zone': 1.5,
|
'final_approach_dist': 0.8,
|
}
|
|
self.mower_ctrl = MowerController(path_points, params=params)
|
print("[INFO] 运动控制器初始化完成")
|
return True
|
|
except Exception as e:
|
print(f"[ERROR] 加载路径失败: {e}")
|
return False
|
|
def connect(self) -> bool:
|
"""连接串口"""
|
if not self.receiver.connect():
|
return False
|
|
# 创建控制命令发送器 (共用串口对象)
|
self.cmd_sender = ControlCommandSender(self.receiver.serial)
|
print("[INFO] 控制命令发送器就绪")
|
return True
|
|
def disconnect(self):
|
"""断开连接"""
|
self.receiver.disconnect()
|
|
def _convert_gps_to_local(self, gps: GPSData) -> tuple:
|
"""
|
将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换)
|
|
Returns:
|
(x, y, heading_rad, vx, vy)
|
"""
|
# TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标
|
# 这里简化处理,假设GPS已提供相对坐标或使用UTM转换
|
|
# 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针)
|
# 需要转换为数学坐标系 (东为0°逆时针)
|
heading_rad = math.radians(90 - gps.heading_angle)
|
heading_rad = wrap_angle(heading_rad)
|
|
# 速度分量 (GPS提供东北天坐标系)
|
vx = gps.east_velocity # 东向速度 = X轴速度
|
vy = gps.north_velocity # 北向速度 = Y轴速度
|
|
# 位置 (简化,实际需要坐标转换)
|
x = gps.longitude * 111320.0 * math.cos(math.radians(gps.latitude)) # 近似米
|
y = gps.latitude * 110540.0 # 近似米
|
|
return (x, y, heading_rad, vx, vy)
|
|
def _control_to_pwm(self, forward: int, turn: int) -> tuple:
|
"""
|
将控制信号转换为PWM值
|
|
Args:
|
forward: 前进信号 (-100 到 100)
|
turn: 转向信号 (-100 到 100, 负为右转, 正为左转)
|
|
Returns:
|
(steering_pwm, throttle_pwm) 单位us
|
"""
|
# 油门: -100~100 映射到 1000~2000us, 中位1500us
|
throttle_pwm = int(1500 + forward * 5.0)
|
|
# 转向: -100~100 映射到 1000~2000us, 中位1500us
|
# 注意: 正值左转(1000), 负值右转(2000)
|
steering_pwm = int(1500 - turn * 5.0)
|
|
return (steering_pwm, throttle_pwm)
|
|
def run(self):
|
"""运行实时控制循环"""
|
if not self.mower_ctrl:
|
print("[ERROR] 请先加载路径文件")
|
return
|
|
print("[INFO] 开始实时控制... (按 Ctrl+C 停止)")
|
print("[INFO] 等待GPS数据...")
|
|
try:
|
start_time = time.time()
|
last_stats_time = start_time
|
|
# 等待第一个有效的GPS数据
|
gps_ready = False
|
while not gps_ready:
|
gps_data, imu_data = self.receiver.receive_packet()
|
|
if gps_data:
|
if gps_data.position_quality > 0 and gps_data.satellite_count >= 4:
|
self.latest_gps = gps_data
|
gps_ready = True
|
print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
|
else:
|
print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
|
|
if imu_data:
|
self.latest_imu = imu_data
|
|
print("[INFO] 开始控制循环")
|
|
while True:
|
current_time = time.time()
|
|
# 接收数据 (非阻塞式,快速轮询)
|
gps_data, imu_data = self.receiver.receive_packet()
|
|
if gps_data:
|
self.latest_gps = gps_data
|
self.last_gps_time = current_time
|
|
if imu_data:
|
self.latest_imu = imu_data
|
self.last_imu_time = current_time
|
|
# 控制周期检查 (74Hz)
|
if current_time - self.last_control_time >= self.control_period:
|
if self.latest_gps:
|
# 转换GPS数据到本地坐标
|
x, y, heading, vx, vy = self._convert_gps_to_local(self.latest_gps)
|
|
# 更新控制器状态
|
self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time)
|
|
if self.latest_imu:
|
accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z)
|
gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z)
|
self.mower_ctrl.update_imu(accel, gyro, current_time)
|
|
# 计算控制信号
|
control_output = self.mower_ctrl.compute_control(current_time)
|
forward_signal = control_output['forward'] # -100~100
|
turn_signal = control_output['turn'] # -100~100
|
|
# 转换为PWM并发送
|
steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal)
|
self.cmd_sender.send_control_command(steering_pwm, throttle_pwm)
|
|
self.control_count += 1
|
self.last_control_time = current_time
|
|
# 打印控制信息 (降低频率)
|
if self.control_count % 74 == 0: # 每秒打印一次
|
status = control_output['info'].get('status', 'running')
|
xte = control_output['info'].get('xte', 0)
|
heading_err_deg = math.degrees(control_output['info'].get('heading_err', 0))
|
|
print(f"[CTRL] 状态: {status}, "
|
f"前进: {forward_signal:+4d}, 转向: {turn_signal:+4d}, "
|
f"PWM: S={steering_pwm}, T={throttle_pwm}, "
|
f"横向误差: {xte:.2f}m, 航向误差: {heading_err_deg:+.1f}°")
|
|
# 检查是否完成
|
if control_output['info'].get('status') == 'finished':
|
print("[INFO] 路径跟踪完成!")
|
break
|
|
# 统计信息 (每10秒)
|
if current_time - last_stats_time > 10.0:
|
print(f"\n========== 运行统计 ==========")
|
print(f"运行时间: {current_time - start_time:.1f}s")
|
print(f"控制次数: {self.control_count}")
|
print(f"GPS数据包: {self.receiver.gps_count}")
|
print(f"IMU数据包: {self.receiver.imu_count}")
|
print(f"命令发送: {self.cmd_sender.get_stats()}")
|
print(f"错误计数: {self.receiver.error_count}")
|
|
if self.latest_gps:
|
print(f"GPS状态: 质量={self.latest_gps.position_quality}, "
|
f"卫星={self.latest_gps.satellite_count}, "
|
f"航向={self.latest_gps.heading_angle:.1f}°")
|
|
print(f"==============================\n")
|
last_stats_time = current_time
|
|
except KeyboardInterrupt:
|
print("\n[INFO] 用户中断")
|
|
finally:
|
# 发送停止命令 (中位值)
|
print("[INFO] 发送停止命令...")
|
for _ in range(5):
|
self.cmd_sender.send_control_command(1500, 1500)
|
time.sleep(0.02)
|
|
# 打印最终统计
|
print("\n========== 最终统计 ==========")
|
print(f"总控制次数: {self.control_count}")
|
print(f"GPS数据包: {self.receiver.gps_count}")
|
print(f"IMU数据包: {self.receiver.imu_count}")
|
print(f"命令发送: {self.cmd_sender.get_stats()}")
|
print(f"错误计数: {self.receiver.error_count}")
|
print("==============================\n")
|
|
|
def main():
|
"""主函数"""
|
# 配置
|
PORT = "COM17" # 根据实际情况修改
|
BAUDRATE = 921600
|
PATH_FILE = "example_path.json" # 路径文件
|
|
# 创建控制器
|
controller = RealtimeController(PORT, BAUDRATE)
|
|
# 加载路径
|
if not controller.load_path(PATH_FILE):
|
print("[ERROR] 路径加载失败,退出")
|
return
|
|
# 连接串口
|
if not controller.connect():
|
print("[ERROR] 串口连接失败,退出")
|
return
|
|
# 运行控制循环
|
try:
|
controller.run()
|
finally:
|
controller.disconnect()
|
|
|
if __name__ == "__main__":
|
main()
|