"""
|
MowerController
|
|
控制器假设与契约:
|
- 坐标系:XY 平面单位为米(RTK 提供),Z 忽略用于平面跟踪。
|
- 航向(heading):弧度,范围任意,控制器会处理环绕。
|
- GPS 频率(位置/速度/heading/attitude)约 10Hz;IMU(加速度、角速度)100Hz。
|
- 控制输出频率:74Hz,返回两个信号:forward(-100..100,对应 -1..+1 m/s)和 turn(-100..100,对应 -pi/2..+pi/2 rad/s,正为左转)。
|
- 路径文件:JSON 列表 [[x,y],[x,y],...]
|
|
设计说明:
|
- 使用纯追踪(pure pursuit)方法选取前视点,并基于角度误差使用 P 控制计算期望偏航速率(rad/s)。
|
- 前进速度与偏航误差耦合:偏航误差大时降低前进速度以保证可转弯能力。
|
|
接口:
|
- controller = MowerController(path_points, params)
|
- controller.update_gps(pos_xy, heading, vel_xy, timestamp)
|
- controller.update_imu(accel3, gyro3, timestamp)
|
- controller.compute_control(timestamp) -> {'forward': int, 'turn': int, 'info': {...}}
|
|
"""
|
from math import atan2, hypot, sin, cos, pi, acos
|
import time
|
|
|
def wrap_angle(a):
|
# wrap to [-pi, pi)
|
while a >= pi:
|
a -= 2 * pi
|
while a < -pi:
|
a += 2 * pi
|
return a
|
|
|
class MowerController:
|
def __init__(self, path_points, params=None):
|
# path_points: list of [x,y]
|
# 原始路径点
|
original_path = [tuple(p) for p in path_points]
|
if len(original_path) == 0:
|
raise ValueError("path_points cannot be empty")
|
|
# 保存原始路径用于拐点检测
|
self.original_path = original_path
|
|
# 路径加密(densify):避免首段过长导致纯追踪前视点跳到远处
|
p_params = params or {}
|
max_seg_len = p_params.get('max_segment_length', 0.6) # 默认将长段切分为 <=0.6m
|
densified = []
|
for i in range(len(original_path) - 1):
|
x1, y1 = original_path[i]
|
x2, y2 = original_path[i + 1]
|
densified.append((x1, y1))
|
dx = x2 - x1
|
dy = y2 - y1
|
seg_len = hypot(dx, dy)
|
if seg_len > max_seg_len and seg_len > 1e-6:
|
# 需要插入若干中间点(不包含终点,终点下一循环添加)
|
steps = int(seg_len // max_seg_len)
|
# 使用均匀线性插值(1..steps),防止首段过长纯追踪直接跳过
|
for k in range(1, steps + 1):
|
ratio = k / (steps + 1)
|
densified.append((x1 + dx * ratio, y1 + dy * ratio))
|
# 添加最后一个点
|
densified.append(original_path[-1])
|
self.path = densified
|
self.n = len(self.path)
|
# 预计算每段长度及累积长度用于进度计算
|
self.segment_lengths = []
|
self.cum_lengths = [0.0]
|
for i in range(self.n - 1):
|
ax, ay = self.path[i]
|
bx, by = self.path[i+1]
|
L = hypot(bx-ax, by-ay)
|
self.segment_lengths.append(L)
|
self.cum_lengths.append(self.cum_lengths[-1] + L)
|
self.total_length = self.cum_lengths[-1] if self.cum_lengths else 0.0
|
# 进度单调性保护变量
|
self.last_progress = 0.0
|
|
# parameters and gains
|
p = params or {}
|
self.max_forward_mps = p.get('max_forward_mps', 0.6) # 更保守的速度
|
self.max_reverse_mps = p.get('max_reverse_mps', 0.2) # 保守的倒车速度
|
self.max_yawrate = p.get('max_yawrate', pi/4) # 更保守的转向速率
|
|
# 软启动和加减速
|
self.accel = p.get('accel', 0.2) # m/s^2
|
self.current_speed_target = 0.0 # 目标速度
|
self.current_speed = 0.0 # 实际速度
|
|
# 控制增益(提高响应速度和精度)
|
self.k_heading = p.get('k_heading', 1.8) # 大幅提高转向增益,加快航向收敛
|
self.k_heading_d = p.get('k_heading_d', 0.25) # 增加微分作用,抑制振荡
|
self.k_xte = p.get('k_xte', 1.2) # 大幅提高横向误差增益,激进纠正偏离
|
self.k_curv = p.get('k_curv', 0.2) # 降低曲率前馈增益
|
self.base_speed = p.get('base_speed', 0.5) # 提高基础速度
|
|
# 路径跟踪参数(减小lookahead以提高精度)
|
self.lookahead_time = p.get('lookahead_time', 2.0) # 减小前视时间
|
# 减小最小前视距离以提高轨迹精度
|
self.lookahead_min = p.get('lookahead_min', 0.35)
|
self.lookahead_max = p.get('lookahead_max', 1.5) # 减小最大前视距离
|
self.goal_tolerance = p.get('goal_tolerance', 0.3) # m
|
self.slow_zone = p.get('slow_zone', 1.5) # 减小减速区域
|
self.final_approach_dist = p.get('final_approach_dist', 0.8) # 减小精确接近区域
|
|
# 新增:横向误差相关
|
self.max_xte = p.get('max_xte', 1.0) # 减小最大横向误差限制,提高精度
|
self.xte_gain_scale = p.get('xte_gain_scale', 1.0) # 移除横向误差缩放,使用完整增益
|
|
# 速度控制缩放参数
|
self.curv_gain_scale = p.get('curv_gain_scale', 2.0) # 曲率缩放增益
|
self.heading_err_gain_scale = p.get('heading_err_gain_scale', 1.5) # 航向误差缩放增益
|
|
# 状态估计和滤波
|
self.yawrate_filter = 0.0
|
self.yawrate_alpha = p.get('yawrate_alpha', 0.90) # 进一步增加平滑度
|
self.last_heading_err = 0.0 # 用于D控制
|
self.last_time = None # 用于计算dt
|
self.last_curv = 0.0 # 用于平滑曲率变化
|
|
# 累积误差控制
|
self.heading_err_sum = 0.0 # 航向误差积分
|
self.xte_sum = 0.0 # 横向误差积分
|
self.max_sum = 2.0 # 积分限幅
|
self.k_heading_i = p.get('k_heading_i', 0.01) # 航向误差积分增益
|
self.k_xte_i = p.get('k_xte_i', 0.01) # 横向误差积分增益
|
|
# 错误恢复
|
self.consecutive_large_err = 0 # 连续大误差计数
|
self.large_err_threshold = p.get('large_err_threshold', 0.5) # 大误差阈值
|
self.max_large_err_count = 10 # 最大连续大误差次数
|
|
# (日志文件将在下面统一创建)
|
|
# state (most recent readings)
|
self.pos = None # (x,y)
|
self.heading = None # rad
|
self.vel = (0.0, 0.0) # vx, vy in m/s (ground frame)
|
self.imu_accel = None
|
self.imu_gyro = None
|
|
# control stages
|
self.STAGE_GOTO_START = 'goto_start' # 移动到起点阶段
|
self.STAGE_FOLLOW_PATH = 'follow_path' # 跟随路径阶段
|
self.stage = self.STAGE_GOTO_START
|
|
# 起点接近参数 - 收紧以确保真正到达起点
|
self.start_tolerance = p.get('start_tolerance', 0.12) # 位置容差(收紧到12cm)
|
self.start_heading_tolerance = p.get('start_heading_tolerance', 0.01) # 航向容差 (~14°,收紧)
|
self.start_approach_dist = p.get('start_approach_dist', 1.5) # 减速距离
|
self.start_slow_speed = p.get('start_slow_speed', 0.25) # 接近速度
|
self.start_min_speed = p.get('start_min_speed', 0.25) # 提高最小速度
|
# 新增:到起点时先转向再前进的阈值(若与起点方向误差大于此阈值,则先转向)
|
self.start_turn_first_heading = p.get('start_turn_first_heading', pi/50) # ~15度
|
# 路径跟随阶段的最小前进速度(防止映射/缩放导致速度归零)
|
self.min_follow_speed = p.get('min_follow_speed', 0.08)
|
# Recovery 模式参数
|
self.in_recovery = False
|
self.recovery_dist = p.get('recovery_dist', 2.0)
|
self.recovery_exit_dist = p.get('recovery_exit_dist', 0.8)
|
self.recovery_speed = p.get('recovery_speed', 0.55)
|
|
# progress - 只使用点索引,不使用segment
|
self.current_target_idx = 0
|
self.nearest_path_idx = 0 # 最近路径点索引
|
self.finished = False
|
# goto_start 子状态:'rotate'|'move'|'align',保证顺序执行且不反复切换
|
self.goto_substage = 'rotate'
|
|
# 拐点转向参数
|
self.waypoint_approach_mode = False # 是否在接近拐点模式
|
self.waypoint_turn_mode = False # 是否在拐点转向模式
|
self.waypoint_turn_target_idx = None # 当前转向的拐点在原始路径中的索引
|
self.waypoint_turn_tolerance = p.get('waypoint_turn_tolerance', 0.12) # 到达拐点的距离容差(12cm)
|
self.waypoint_approach_dist = p.get('waypoint_approach_dist', 0.8) # 开始减速接近拐点的距离(80cm)
|
self.waypoint_turn_heading_tol = p.get('waypoint_turn_heading_tol', 0.1) # 转向完成的航向容差(~6度)
|
self.waypoint_turned = set() # 记录已经转向过的拐点,避免重复转向
|
# 创建日志文件
|
self.log_file = open('controller_log.csv', 'w')
|
# 简化日志格式:移除segment相关
|
self.log_file.write('t,x,y,heading,target_x,target_y,target_dist,target_idx,xte,heading_err,forward,turn,speed,stage,status\n')
|
|
def update_gps(self, pos_xy, heading, vel_xy=None, timestamp=None):
|
"""Update GPS-like measurements at ~10Hz.
|
|
pos_xy: (x,y) in meters
|
heading: radians
|
vel_xy: (vx, vy) in m/s in global frame (optional)
|
"""
|
self.pos = tuple(pos_xy)
|
self.heading = float(heading)
|
if vel_xy is not None:
|
self.vel = tuple(vel_xy)
|
|
def update_imu(self, accel3, gyro3, timestamp=None):
|
"""Update IMU (100Hz). accel3 and gyro3 are 3-element tuples."""
|
self.imu_accel = tuple(accel3)
|
self.imu_gyro = tuple(gyro3)
|
|
def _calc_xte(self, p1, p2, pos):
|
"""Calculate cross track error (perpendicular distance to line segment)"""
|
x, y = pos
|
x1, y1 = p1
|
x2, y2 = p2
|
|
# Convert to local coordinates
|
dx = x2 - x1
|
dy = y2 - y1
|
l2 = dx*dx + dy*dy
|
|
if l2 == 0: # points are identical
|
return hypot(x - x1, y - y1)
|
|
# Project point onto line
|
t = ((x - x1)*dx + (y - y1)*dy) / l2
|
|
if t < 0: # before start
|
return hypot(x - x1, y - y1)
|
elif t > 1: # after end
|
return hypot(x - x2, y - y2)
|
else: # projection point lies on segment
|
px = x1 + t*dx
|
py = y1 + t*dy
|
xte = hypot(x - px, y - py)
|
# Determine sign based on which side of the line the point is on
|
sign = 1 if (x - px)*dy - (y - py)*dx > 0 else -1
|
return sign * xte
|
|
def _is_waypoint_turn_needed(self, idx):
|
"""
|
检测路径点idx是否是拐点(需要原地转向)
|
|
拐点定义:前后两段路径方向变化超过45度
|
"""
|
if idx <= 0 or idx >= len(self.path) - 1:
|
return False
|
|
# 前一段方向
|
p_prev = self.path[idx - 1]
|
p_curr = self.path[idx]
|
p_next = self.path[idx + 1]
|
|
dx1 = p_curr[0] - p_prev[0]
|
dy1 = p_curr[1] - p_prev[1]
|
dx2 = p_next[0] - p_curr[0]
|
dy2 = p_next[1] - p_curr[1]
|
|
# 计算两段方向
|
heading1 = atan2(dy1, dx1)
|
heading2 = atan2(dy2, dx2)
|
|
# 方向变化角度
|
angle_diff = abs(wrap_angle(heading2 - heading1))
|
|
# 如果方向变化超过45度,认为是拐点
|
return angle_diff > pi / 4
|
|
def _find_nearest_original_waypoint(self):
|
"""
|
在原始路径中找到最接近当前位置的拐点
|
|
Returns: (waypoint_idx, distance) 或 (None, None)
|
"""
|
if self.pos is None:
|
return None, None
|
|
x, y = self.pos
|
nearest_waypoint_idx = None
|
nearest_dist = float('inf')
|
|
# 遍历原始路径的所有拐点
|
for i in range(1, len(self.original_path) - 1):
|
# 检查是否是拐点
|
p_prev = self.original_path[i - 1]
|
p_curr = self.original_path[i]
|
p_next = self.original_path[i + 1]
|
|
dx1 = p_curr[0] - p_prev[0]
|
dy1 = p_curr[1] - p_prev[1]
|
dx2 = p_next[0] - p_curr[0]
|
dy2 = p_next[1] - p_curr[1]
|
|
heading1 = atan2(dy1, dx1)
|
heading2 = atan2(dy2, dx2)
|
angle_diff = abs(wrap_angle(heading2 - heading1))
|
|
# 如果是拐点(方向变化>45度)
|
if angle_diff > pi / 4:
|
dist = hypot(p_curr[0] - x, p_curr[1] - y)
|
if dist < nearest_dist:
|
nearest_dist = dist
|
nearest_waypoint_idx = i
|
|
return nearest_waypoint_idx, nearest_dist
|
|
def _get_target_heading_at_waypoint(self, idx):
|
"""获取原始路径点idx处的目标航向(下一段的方向)"""
|
if idx >= len(self.original_path) - 1:
|
# 最后一个点,返回前一段方向
|
if idx > 0:
|
dx = self.original_path[idx][0] - self.original_path[idx-1][0]
|
dy = self.original_path[idx][1] - self.original_path[idx-1][1]
|
return atan2(dy, dx)
|
return 0.0
|
|
# 返回到下一个点的方向
|
dx = self.original_path[idx + 1][0] - self.original_path[idx][0]
|
dy = self.original_path[idx + 1][1] - self.original_path[idx][1]
|
return atan2(dy, dx)
|
|
def _find_lookahead_point(self, lookahead_dist):
|
"""
|
简化的Pure Pursuit前视点查找 - 只使用点索引
|
|
从nearest_path_idx开始向前搜索,找到第一个距离>=lookahead_dist的点
|
|
Returns: (lookahead_xy, curvature, lookahead_idx, nearest_idx)
|
"""
|
if self.pos is None:
|
return self.path[0], 0.0, 0, 0
|
|
x, y = self.pos
|
|
# 1. 找最近点 - 如果离路径很远,使用全局搜索;否则使用窗口搜索
|
# 先检查当前nearest_path_idx的距离
|
px, py = self.path[self.nearest_path_idx]
|
current_nearest_dist = hypot(px - x, py - y)
|
|
# 如果距离>1.0m,使用全局搜索
|
if current_nearest_dist > 1.0:
|
search_start = 0
|
search_end = len(self.path)
|
else:
|
# 使用窗口搜索
|
search_start = max(0, self.nearest_path_idx - 5)
|
search_end = min(len(self.path), self.nearest_path_idx + 30)
|
|
nearest_idx = self.nearest_path_idx
|
nearest_dist = float('inf')
|
|
for i in range(search_start, search_end):
|
px, py = self.path[i]
|
d = hypot(px - x, py - y)
|
if d < nearest_dist:
|
nearest_dist = d
|
nearest_idx = i
|
|
self.nearest_path_idx = nearest_idx # 更新最近点索引
|
|
# 2. 从nearest_idx开始向前搜索lookahead点
|
lookahead_idx = nearest_idx
|
for i in range(nearest_idx, len(self.path)):
|
px, py = self.path[i]
|
d = hypot(px - x, py - y)
|
if d >= lookahead_dist:
|
lookahead_idx = i
|
break
|
else:
|
# 没找到,使用最后一个点
|
lookahead_idx = len(self.path) - 1
|
|
# 3. 估计曲率(基于nearest_idx附近的点)
|
curvature = 0.0
|
if 1 <= nearest_idx < len(self.path) - 1:
|
p1 = self.path[nearest_idx - 1]
|
p2 = self.path[nearest_idx]
|
p3 = self.path[nearest_idx + 1]
|
|
v1 = (p2[0] - p1[0], p2[1] - p1[1])
|
v2 = (p3[0] - p2[0], p3[1] - p2[1])
|
|
l1 = hypot(v1[0], v1[1])
|
l2 = hypot(v2[0], v2[1])
|
|
if l1 > 1e-6 and l2 > 1e-6:
|
dot = v1[0]*v2[0] + v1[1]*v2[1]
|
cos_angle = max(-1.0, min(1.0, dot / (l1 * l2)))
|
angle_change = abs(pi - acos(cos_angle))
|
curvature = angle_change / max(l1, 1e-6)
|
|
return self.path[lookahead_idx], curvature, lookahead_idx, nearest_idx
|
|
def _find_nearest_original_waypoint(self):
|
"""
|
在原始路径中查找下一个未到达的拐点
|
按照路径顺序查找,不跳过
|
|
Returns: (waypoint_idx, distance) 或 (None, inf) 如果没有找到
|
"""
|
if not hasattr(self, 'original_path') or len(self.original_path) < 3:
|
return None, float('inf')
|
|
x, y = self.pos
|
|
# 遍历原始路径中的拐点(跳过起点和终点)
|
for i in range(1, len(self.original_path) - 1):
|
# 如果这个拐点已经转向过,跳过
|
if i in self.waypoint_turned:
|
continue
|
|
# 检查方向是否改变
|
prev_pt = self.original_path[i-1]
|
curr_pt = self.original_path[i]
|
next_pt = self.original_path[i+1]
|
|
# 计算前后方向向量
|
dx1 = curr_pt[0] - prev_pt[0]
|
dy1 = curr_pt[1] - prev_pt[1]
|
dx2 = next_pt[0] - curr_pt[0]
|
dy2 = next_pt[1] - curr_pt[1]
|
|
# 计算角度
|
angle1 = atan2(dy1, dx1)
|
angle2 = atan2(dy2, dx2)
|
angle_change = abs(wrap_angle(angle2 - angle1))
|
|
# 如果角度变化大于45度,认为是拐点
|
if angle_change > pi / 4:
|
# 找到第一个未完成的拐点,直接返回
|
dist = hypot(curr_pt[0] - x, curr_pt[1] - y)
|
return i, dist
|
|
return None, float('inf')
|
|
def _get_target_heading_at_waypoint(self, waypoint_idx):
|
"""
|
获取拐点处的目标航向(下一段路径的方向)
|
|
Args:
|
waypoint_idx: 拐点在原始路径中的索引
|
|
Returns: 目标航向(弧度)
|
"""
|
if waypoint_idx is None:
|
return 0.0 # 默认向东
|
|
# 如果是最后一个点,使用前一段的方向(保持不变)
|
if waypoint_idx >= len(self.original_path) - 1:
|
if waypoint_idx > 0:
|
prev_pt = self.original_path[waypoint_idx - 1]
|
curr_pt = self.original_path[waypoint_idx]
|
return atan2(curr_pt[1] - prev_pt[1], curr_pt[0] - prev_pt[0])
|
else:
|
return 0.0 # 只有一个点,默认向东
|
|
curr_pt = self.original_path[waypoint_idx]
|
next_pt = self.original_path[waypoint_idx + 1]
|
|
dx = next_pt[0] - curr_pt[0]
|
dy = next_pt[1] - curr_pt[1]
|
|
return atan2(dy, dx)
|
|
def _compute_xte_simple(self, pos):
|
"""
|
简化的横向误差计算 - 基于最近点
|
|
Returns: (xte, proj_x, proj_y)
|
"""
|
x, y = pos
|
nearest_idx = self.nearest_path_idx
|
|
# 计算到最近点的向量
|
px, py = self.path[nearest_idx]
|
|
# 如果有下一个点,计算到路径段的横向误差
|
if nearest_idx < len(self.path) - 1:
|
nx, ny = self.path[nearest_idx + 1]
|
|
# 段方向向量
|
dx = nx - px
|
dy = ny - py
|
seg_len2 = dx*dx + dy*dy
|
|
if seg_len2 > 1e-9:
|
# 投影到段上
|
t = ((x - px)*dx + (y - py)*dy) / seg_len2
|
t = max(0.0, min(1.0, t)) # 限制在[0,1]
|
|
proj_x = px + t * dx
|
proj_y = py + t * dy
|
|
# 横向误差(带符号)
|
xte_vec_x = x - proj_x
|
xte_vec_y = y - proj_y
|
xte_mag = hypot(xte_vec_x, xte_vec_y)
|
|
# 使用叉乘判断左右
|
cross = xte_vec_x * dy - xte_vec_y * dx
|
xte = xte_mag if cross > 0 else -xte_mag
|
|
return xte, proj_x, proj_y
|
|
# 否则直接用到最近点的距离
|
xte = hypot(x - px, y - py)
|
return xte, px, py
|
|
def compute_control(self, timestamp=None):
|
"""Compute forward and turn outputs mapped to [-100,100].
|
|
Returns dict with keys: forward(int), turn(int), info(dict)
|
"""
|
if self.finished:
|
return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
|
|
if self.pos is None or self.heading is None:
|
return {'forward': 0, 'turn': 0, 'info': {'status': 'no_state'}}
|
|
# ensure timestamp is numeric for logging/formatting
|
if timestamp is None:
|
timestamp = time.time()
|
|
# 计算时间增量
|
if self.last_time is None:
|
dt = 0.014 # 约74Hz
|
else:
|
dt = timestamp - self.last_time if timestamp else 0.014
|
self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0)
|
|
x, y = self.pos
|
|
# 处理移动到起点阶段:使用持久子状态确保顺序执行:rotate -> move -> align
|
if self.stage == self.STAGE_GOTO_START:
|
start_point = self.path[0]
|
dx = start_point[0] - x
|
dy = start_point[1] - y
|
dist_to_start = hypot(dx, dy)
|
|
# 目标朝向指向起点
|
desired_heading_to_start = atan2(dy, dx)
|
heading_err_to_start = wrap_angle(desired_heading_to_start - self.heading)
|
|
# 初始化子状态(保守)
|
if not hasattr(self, 'goto_substage') or self.goto_substage is None:
|
self.goto_substage = 'rotate'
|
|
# 子状态:rotate -> move -> align
|
if self.goto_substage == 'rotate':
|
# 原地转向直到朝向误差足够小(移除距离退出条件,确保先转好)
|
if abs(heading_err_to_start) <= self.start_turn_first_heading:
|
self.goto_substage = 'move'
|
else:
|
yawrate_cmd = self.k_heading * heading_err_to_start * 1.3 # 稍微提高转向速度
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
|
turn_signal = max(-100, min(100, turn_signal))
|
forward_signal = 0
|
# target is start_point for goto_start substage
|
target_x, target_y = start_point
|
target_dist = hypot(target_x - x, target_y - y)
|
target_idx = 0
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
|
f'0.000,{heading_err_to_start:.3f},{forward_signal},{turn_signal},0.000,{self.stage},rotate_to_start\n')
|
self.log_file.flush()
|
return {'forward': forward_signal, 'turn': turn_signal, 'info': {
|
'status': 'rotate_to_start', 'stage': self.stage, 'heading_err': heading_err_to_start,
|
'dist_to_start': dist_to_start
|
}}
|
|
if self.goto_substage == 'move':
|
# 在移动子状态期间,不会回到 rotate 子状态,直到显式切换
|
desired_heading = desired_heading_to_start
|
heading_err = wrap_angle(desired_heading - self.heading)
|
|
# 如果已经接近起点则进入 align 子状态
|
if dist_to_start < self.start_tolerance:
|
self.goto_substage = 'align'
|
else:
|
# 常规:面向起点并前进
|
if dist_to_start < self.start_approach_dist:
|
dist_ratio = dist_to_start / self.start_approach_dist
|
speed_scale = 0.4 + 0.6 * dist_ratio # 提高最小速度比例
|
base_target_speed = self.start_slow_speed + (self.base_speed - self.start_slow_speed) * dist_ratio
|
else:
|
speed_scale = 1.0
|
base_target_speed = min(self.base_speed * 0.8, 0.5) # 限制最大速度避免过冲
|
|
# 航向误差缩放更宽松
|
heading_ratio = abs(heading_err) / pi
|
err_scale = 0.5 + 0.5 * (1.0 - heading_ratio) # 更宽松
|
target_speed = max(self.start_min_speed * 1.5, base_target_speed * speed_scale * err_scale)
|
target_speed = max(target_speed, self.start_min_speed * 1.2) # 确保最小速度
|
|
yawrate_cmd = self.k_heading * heading_err
|
forward_signal = int(round((target_speed / self.max_forward_mps) * 100))
|
forward_signal = max(-100, min(100, forward_signal))
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
|
turn_signal = max(-100, min(100, turn_signal))
|
|
target_x, target_y = start_point
|
target_dist = hypot(target_x - x, target_y - y)
|
target_idx = 0
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
|
f'0.000,{heading_err:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},moving_to_start\n')
|
self.log_file.flush()
|
|
return {'forward': forward_signal, 'turn': turn_signal, 'info': {
|
'status': 'moving_to_start', 'stage': self.stage, 'dist_to_start': dist_to_start,
|
'heading_err': heading_err
|
}}
|
|
# align 子状态:到达起点后进行原地朝向修正,完成后切换到跟踪
|
if self.goto_substage == 'align':
|
if len(self.path) > 1:
|
path_dx = self.path[1][0] - self.path[0][0]
|
path_dy = self.path[1][1] - self.path[0][1]
|
desired_start_heading = atan2(path_dy, path_dx)
|
else:
|
desired_start_heading = 0.0
|
heading_err_final = wrap_angle(desired_start_heading - self.heading)
|
# 如果最终朝向对齐或非常接近位置,则切换到跟踪
|
# 只有同时满足位置与航向条件才算真正到达起点,避免“偏近+朝向差”提前切换
|
if (dist_to_start < self.start_tolerance) and (abs(heading_err_final) < self.start_heading_tolerance):
|
self.stage = self.STAGE_FOLLOW_PATH
|
self.goto_substage = None
|
# 保持 current_target_idx = 0,让 pure pursuit 自然选择前视点
|
self.current_target_idx = 0
|
# 重置速度为较小值,避免 lookahead 过大
|
self.current_speed = self.start_min_speed
|
# 小幅前进以确保不卡在起点
|
f_sig = int(round((self.start_min_speed / self.max_forward_mps) * 100))
|
target_x, target_y = self.path[0] # 使用起点作为初始目标
|
target_dist = hypot(target_x - x, target_y - y)
|
target_idx = 0
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
|
f'0.000,{heading_err_final:.3f},{f_sig},0,{self.start_min_speed:.3f},{self.stage},reached_start\n')
|
self.log_file.flush()
|
return {'forward': f_sig, 'turn': 0, 'info': {'status': 'reached_start', 'stage': self.stage, 'advanced_idx': self.current_target_idx}}
|
else:
|
# 在起点附近但朝向不对,进行原地修正(停止前进)
|
sign = 1 if heading_err_final >= 0 else -1
|
yawrate_cmd = sign * min(self.max_yawrate * 0.6, max(0.05, abs(heading_err_final) * 0.8))
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
|
turn_signal = max(-100, min(100, turn_signal))
|
target_x, target_y = start_point
|
target_dist = hypot(target_x - x, target_y - y)
|
target_idx = 0
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
|
f'0.000,{heading_err_final:.3f},0,{turn_signal},{0.000:.3f},{self.stage},align_at_start\n')
|
self.log_file.flush()
|
return {'forward': 0, 'turn': turn_signal, 'info': {'status': 'align_at_start', 'heading_err': heading_err_final}}
|
|
# 计算时间增量
|
if self.last_time is None:
|
dt = 0.014 # 约74Hz
|
else:
|
dt = timestamp - self.last_time if timestamp else 0.014
|
self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0)
|
|
# 计算横向误差(使用简化方法)
|
x, y = self.pos
|
xte, proj_x, proj_y = self._compute_xte_simple(self.pos)
|
|
# 预先计算到终点的距离
|
final_dist = hypot(self.path[-1][0] - x, self.path[-1][1] - y)
|
|
# ========== 拐点接近和转向模式检测 ==========
|
# 检查是否需要进入拐点接近或转向模式
|
if not self.waypoint_approach_mode and not self.waypoint_turn_mode:
|
# 在原始路径中查找最近的拐点
|
waypoint_idx, waypoint_dist = self._find_nearest_original_waypoint()
|
|
# 如果拐点在接近距离内(2m)且未转向过,进入接近模式
|
# 注意:增大检测距离,确保在短边也能提前进入接近模式
|
if (waypoint_idx is not None and
|
waypoint_dist < 2.0 and
|
waypoint_idx not in self.waypoint_turned):
|
self.waypoint_approach_mode = True
|
self.waypoint_turn_target_idx = waypoint_idx # 记录当前接近的拐点
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{self.original_path[waypoint_idx][0]:.3f},'
|
f'{self.original_path[waypoint_idx][1]:.3f},'
|
f'{waypoint_dist:.3f},{waypoint_idx},'
|
f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_waypoint_approach\n')
|
self.log_file.flush()
|
|
# ========== 拐点接近模式:精确接近拐点位置 ==========
|
if self.waypoint_approach_mode:
|
waypoint_x, waypoint_y = self.original_path[self.waypoint_turn_target_idx]
|
dx = waypoint_x - x
|
dy = waypoint_y - y
|
waypoint_dist = hypot(dx, dy)
|
|
# 检查是否到达拐点(12cm容差)
|
if waypoint_dist < self.waypoint_turn_tolerance:
|
# 到达拐点,切换到转向模式
|
self.waypoint_approach_mode = False
|
self.waypoint_turn_mode = True
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{waypoint_x:.3f},{waypoint_y:.3f},'
|
f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},'
|
f'{xte:.3f},0.000,0,0,0.000,{self.stage},reached_waypoint\n')
|
self.log_file.flush()
|
else:
|
# 直接朝向拐点前进(忽略路径,直接目标点导航)
|
desired_heading_to_waypoint = atan2(dy, dx)
|
heading_err_to_waypoint = wrap_angle(desired_heading_to_waypoint - self.heading)
|
|
# 速度控制:接近时逐渐减速
|
if waypoint_dist < 0.5: # 50cm内减速
|
speed_ratio = waypoint_dist / 0.5
|
target_speed = self.start_min_speed * (0.4 + 0.6 * speed_ratio)
|
else:
|
target_speed = self.base_speed * 0.7
|
|
# 航向误差也会降低速度
|
heading_ratio = abs(heading_err_to_waypoint) / pi
|
err_scale = 0.5 + 0.5 * (1.0 - heading_ratio)
|
target_speed = target_speed * err_scale
|
target_speed = max(self.start_min_speed * 0.5, target_speed)
|
|
# 转向控制
|
yawrate_cmd = self.k_heading * heading_err_to_waypoint * 1.3 # 稍微提高转向增益
|
|
forward_signal = int(round((target_speed / self.max_forward_mps) * 100))
|
forward_signal = max(-100, min(100, forward_signal))
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
|
turn_signal = max(-100, min(100, turn_signal))
|
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{waypoint_x:.3f},{waypoint_y:.3f},'
|
f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},'
|
f'{xte:.3f},{heading_err_to_waypoint:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},approaching_waypoint\n')
|
self.log_file.flush()
|
|
return {'forward': forward_signal, 'turn': turn_signal, 'info': {
|
'status': 'approaching_waypoint',
|
'waypoint_dist': waypoint_dist,
|
'heading_err': heading_err_to_waypoint,
|
'target_speed': target_speed
|
}}
|
|
# ========== 拐点转向模式:原地转向 ==========
|
if self.waypoint_turn_mode:
|
# 获取目标航向(下一段的方向)
|
target_heading = self._get_target_heading_at_waypoint(self.waypoint_turn_target_idx)
|
heading_err = wrap_angle(target_heading - self.heading)
|
|
# 检查是否转向完成
|
if abs(heading_err) < self.waypoint_turn_heading_tol:
|
# 转向完成,退出转向模式
|
self.waypoint_turned.add(self.waypoint_turn_target_idx) # 记录已转向
|
self.waypoint_turn_mode = False
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},'
|
f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},'
|
f'0.000,{self.waypoint_turn_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},0,0,0.000,{self.stage},exit_waypoint_turn\n')
|
self.log_file.flush()
|
else:
|
# 原地转向:forward=0, turn基于航向误差
|
yawrate_cmd = self.k_heading * heading_err * 1.2 # 稍微提高转向增益
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
|
turn_signal = max(-100, min(100, turn_signal))
|
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},'
|
f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},'
|
f'0.000,{self.waypoint_turn_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},0,{turn_signal},0.000,{self.stage},waypoint_turning\n')
|
self.log_file.flush()
|
|
return {'forward': 0, 'turn': turn_signal, 'info': {
|
'status': 'waypoint_turning',
|
'heading_err': heading_err,
|
'target_heading': target_heading
|
}}
|
|
# 动态前视距离:基于当前速度和横向误差
|
actual_speed = abs(self.current_speed)
|
xte_mag = abs(xte)
|
|
# lookahead = lookahead_min + speed_component - xte_penalty
|
base_lookahead = self.lookahead_min + self.lookahead_time * actual_speed - 0.3 * min(xte_mag, 0.5)
|
lookahead = max(self.lookahead_min, min(self.lookahead_max, base_lookahead))
|
|
# 恢复模式判定
|
dist_to_path = abs(xte)
|
if not self.in_recovery and dist_to_path > self.recovery_dist:
|
self.in_recovery = True
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},'
|
f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_recovery\n')
|
self.log_file.flush()
|
|
if self.in_recovery and dist_to_path < self.recovery_exit_dist:
|
self.in_recovery = False
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},'
|
f'{xte:.3f},0.000,0,0,0.000,{self.stage},exit_recovery\n')
|
self.log_file.flush()
|
|
# ========== 标准 Pure Pursuit 控制 ==========
|
# 选择前视点
|
(lx, ly), path_curvature, lookahead_idx, nearest_idx = self._find_lookahead_point(lookahead)
|
self.current_target_idx = lookahead_idx
|
|
# 计算前视点方向
|
path_heading = atan2(ly - y, lx - x)
|
|
# 横向误差修正
|
xte_normalized = max(-1.0, min(1.0, xte / self.max_xte))
|
xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale
|
|
# 期望航向 = 前视方向 + xte修正
|
desired_heading = wrap_angle(path_heading + xte_correction)
|
heading_err = wrap_angle(desired_heading - self.heading)
|
|
# 计算航向误差的变化率
|
heading_err_d = (heading_err - self.last_heading_err) / dt if dt > 0 else 0
|
self.last_heading_err = heading_err
|
|
# 积分项计算(带防饱和)
|
self.heading_err_sum += heading_err * dt
|
self.xte_sum += xte * dt
|
self.heading_err_sum = max(-self.max_sum, min(self.max_sum, self.heading_err_sum))
|
self.xte_sum = max(-self.max_sum, min(self.max_sum, self.xte_sum))
|
|
# PID控制
|
yawrate_cmd = (self.k_heading * heading_err +
|
self.k_heading_d * heading_err_d +
|
self.k_heading_i * self.heading_err_sum +
|
self.k_xte_i * self.xte_sum)
|
|
# 速度控制:基于曲率、航向误差、横向误差
|
# 1) 曲率缩放
|
curv_scale = 1.0 / (1.0 + abs(path_curvature) * self.curv_gain_scale)
|
|
# 2) 航向误差缩放
|
heading_err_abs = abs(heading_err)
|
err_scale = 1.0 / (1.0 + heading_err_abs * self.heading_err_gain_scale)
|
|
# 3) 横向误差缩放
|
xte_abs = abs(xte)
|
xte_scale = 1.0 / (1.0 + xte_abs * self.xte_gain_scale)
|
|
# 混合缩放:几何平均 + 最小值
|
geom_scale = (curv_scale * err_scale * xte_scale) ** (1.0/3.0)
|
min_scale = min(curv_scale, err_scale, xte_scale)
|
blend_scale = 0.8 * geom_scale + 0.2 * min_scale
|
|
target_speed = self.base_speed * blend_scale
|
|
# 滤波和限幅
|
self.yawrate_filter = (self.yawrate_alpha * yawrate_cmd +
|
(1-self.yawrate_alpha) * self.yawrate_filter)
|
max_yawrate = self.max_yawrate
|
yawrate_cmd = max(-max_yawrate, min(max_yawrate, self.yawrate_filter))
|
|
# 恢复模式覆盖(优先级更高)
|
if self.in_recovery:
|
# 恢复模式:朝向投影点
|
path_heading = atan2(proj_y - y, proj_x - x)
|
xte_normalized = max(-1.0, min(1.0, xte / self.max_xte))
|
xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale
|
desired_heading = wrap_angle(path_heading + xte_correction)
|
heading_err = wrap_angle(desired_heading - self.heading)
|
yawrate_cmd = self.k_heading * heading_err
|
target_speed = self.recovery_speed
|
# 基于距离终点的距离调整速度
|
in_slow_zone = final_dist < self.slow_zone
|
in_final_approach = final_dist < self.final_approach_dist
|
|
# 错误恢复逻辑
|
if abs(xte) > self.large_err_threshold * 1.5 or abs(heading_err) > pi/2.5:
|
self.consecutive_large_err += 1
|
if self.consecutive_large_err > self.max_large_err_count:
|
# 重置到安全状态
|
self.consecutive_large_err = 0
|
self.heading_err_sum = 0
|
self.xte_sum = 0
|
else:
|
self.consecutive_large_err = 0
|
|
# 接近终点时调整速度
|
if not self.in_recovery:
|
if in_slow_zone:
|
dist_scale = max(0.3, min(1.0, final_dist / self.slow_zone))
|
if in_final_approach:
|
dist_scale *= max(0.4, final_dist / self.final_approach_dist)
|
target_speed *= dist_scale
|
|
# 在终点附近增加精度
|
if in_final_approach:
|
target_speed *= 0.6
|
if abs(xte) > 0.15 or abs(heading_err) > pi/10:
|
target_speed *= 0.7
|
|
# 强制最小速度(避免完全停滞)
|
if self.stage == self.STAGE_FOLLOW_PATH and not self.in_recovery:
|
dynamic_min = self.min_follow_speed if in_slow_zone else self.min_follow_speed * 1.5
|
if abs(heading_err) < 3.0 and target_speed < dynamic_min:
|
target_x, target_y = lx, ly
|
target_dist = hypot(target_x - x, target_y - y)
|
self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},0,0,'
|
f'{dynamic_min:.3f},{self.stage},enforce_min_speed\n')
|
self.log_file.flush()
|
target_speed = dynamic_min
|
|
# 软启动:限制加速度
|
accel = self.accel * (0.5 if in_slow_zone else 1.0)
|
speed_change = target_speed - self.current_speed
|
max_change = accel * dt
|
self.current_speed += max(-max_change, min(max_change, speed_change))
|
|
# 倒车控制
|
if self.stage == self.STAGE_GOTO_START:
|
if abs(heading_err) > 2.9:
|
self.current_speed = -self.max_reverse_mps * 0.5
|
else:
|
if abs(heading_err) > 2.9 and not self.in_recovery:
|
self.current_speed = -self.max_reverse_mps * 0.4
|
if self.current_speed < 0 and abs(heading_err) < 2.5:
|
self.current_speed = 0.0
|
|
# 终点检测(简化,只基于距离和点索引)
|
if final_dist < self.goal_tolerance:
|
self.finished = True
|
return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
|
|
# 扩展完成条件:接近终点且误差小
|
if (final_dist < self.goal_tolerance * 1.5 and
|
self.current_target_idx >= self.n - 2 and
|
abs(heading_err) < 0.3 and abs(xte) < 0.2):
|
self.finished = True
|
return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
|
|
# map to signals [-100,100]
|
# 记录映射前的速度信息以便诊断
|
log_t = timestamp if timestamp is not None else time.time()
|
pre_map_speed = self.current_speed
|
# 诊断日志(简化)
|
target_x, target_y = lx, ly
|
target_dist = hypot(target_x - x, target_y - y)
|
pre_map_forward = int(round((pre_map_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
|
pre_map_forward = max(-100, min(100, pre_map_forward))
|
pre_map_turn = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0
|
pre_map_turn = max(-100, min(100, pre_map_turn))
|
self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},{pre_map_forward},{pre_map_turn},{pre_map_speed:.3f},'
|
f'{self.stage},pre_map\n')
|
self.log_file.flush()
|
|
forward_signal = int(round((self.current_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
|
forward_signal = max(-100, min(100, forward_signal))
|
|
# 强制最小前进信号(防止完全停止)
|
if forward_signal == 0 and not self.finished and abs(heading_err) < 3.0:
|
forced_signal = int(round((self.min_follow_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
|
if forced_signal != 0:
|
target_x, target_y = lx, ly
|
target_dist = hypot(target_x - x, target_y - y)
|
self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},{forced_signal},{int(round((yawrate_cmd / self.max_yawrate) * 100))},'
|
f'{self.min_follow_speed:.3f},{self.stage},force_min_forward\n')
|
self.log_file.flush()
|
forward_signal = forced_signal
|
self.current_speed = self.min_follow_speed
|
|
turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0
|
turn_signal = max(-100, min(100, turn_signal))
|
|
status = 'finished' if self.finished else 'running'
|
info = {
|
'status': status,
|
'pos': (x, y),
|
'heading': self.heading,
|
'desired_heading': desired_heading,
|
'heading_err': heading_err,
|
'current_speed': self.current_speed,
|
'yawrate_cmd': yawrate_cmd,
|
'target_xy': (lx, ly),
|
'path_curvature': path_curvature,
|
'final_dist': final_dist,
|
}
|
|
# 简化的日志记录
|
t = log_t
|
target_x, target_y = lx, ly
|
target_dist = hypot(target_x - x, target_y - y)
|
self.log_file.write(f'{t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
|
f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
|
f'{xte:.3f},{heading_err:.3f},{forward_signal},{turn_signal},{self.current_speed:.3f},'
|
f'{self.stage},{status}\n')
|
self.log_file.flush()
|
|
return {'forward': forward_signal, 'turn': turn_signal, 'info': info}
|
|
|
if __name__ == '__main__':
|
# quick manual test
|
import json
|
path = [[0,0],[5,0],[10,0]]
|
c = MowerController(path)
|
c.update_gps((0, -1.0), 0.0)
|
for i in range(20):
|
out = c.compute_control()
|
print(i, out)
|
# fake forward motion
|
f = out['forward'] / 100.0 * c.max_forward_mps
|
hd = c.heading + (out['turn'] / 100.0) * c.max_yawrate * 0.1
|
px = c.pos[0] + f * cos(hd) * 0.1
|
py = c.pos[1] + f * sin(hd) * 0.1
|
c.update_gps((px,py), hd)
|