""" MowerController 控制器假设与契约: - 坐标系:XY 平面单位为米(RTK 提供),Z 忽略用于平面跟踪。 - 航向(heading):弧度,范围任意,控制器会处理环绕。 - GPS 频率(位置/速度/heading/attitude)约 10Hz;IMU(加速度、角速度)100Hz。 - 控制输出频率:74Hz,返回两个信号:forward(-100..100,对应 -1..+1 m/s)和 turn(-100..100,对应 -pi/2..+pi/2 rad/s,正为左转)。 - 路径文件:JSON 列表 [[x,y],[x,y],...] 设计说明: - 使用纯追踪(pure pursuit)方法选取前视点,并基于角度误差使用 P 控制计算期望偏航速率(rad/s)。 - 前进速度与偏航误差耦合:偏航误差大时降低前进速度以保证可转弯能力。 接口: - controller = MowerController(path_points, params) - controller.update_gps(pos_xy, heading, vel_xy, timestamp) - controller.update_imu(accel3, gyro3, timestamp) - controller.compute_control(timestamp) -> {'forward': int, 'turn': int, 'info': {...}} """ from math import atan2, hypot, sin, cos, pi, acos import time def wrap_angle(a): # wrap to [-pi, pi) while a >= pi: a -= 2 * pi while a < -pi: a += 2 * pi return a class MowerController: def __init__(self, path_points, params=None): # path_points: list of [x,y] # 原始路径点 original_path = [tuple(p) for p in path_points] if len(original_path) == 0: raise ValueError("path_points cannot be empty") # 保存原始路径用于拐点检测 self.original_path = original_path # 路径加密(densify):避免首段过长导致纯追踪前视点跳到远处 p_params = params or {} max_seg_len = p_params.get('max_segment_length', 0.6) # 默认将长段切分为 <=0.6m densified = [] for i in range(len(original_path) - 1): x1, y1 = original_path[i] x2, y2 = original_path[i + 1] densified.append((x1, y1)) dx = x2 - x1 dy = y2 - y1 seg_len = hypot(dx, dy) if seg_len > max_seg_len and seg_len > 1e-6: # 需要插入若干中间点(不包含终点,终点下一循环添加) steps = int(seg_len // max_seg_len) # 使用均匀线性插值(1..steps),防止首段过长纯追踪直接跳过 for k in range(1, steps + 1): ratio = k / (steps + 1) densified.append((x1 + dx * ratio, y1 + dy * ratio)) # 添加最后一个点 densified.append(original_path[-1]) self.path = densified self.n = len(self.path) # 预计算每段长度及累积长度用于进度计算 self.segment_lengths = [] self.cum_lengths = [0.0] for i in range(self.n - 1): ax, ay = self.path[i] bx, by = self.path[i+1] L = hypot(bx-ax, by-ay) self.segment_lengths.append(L) self.cum_lengths.append(self.cum_lengths[-1] + L) self.total_length = self.cum_lengths[-1] if self.cum_lengths else 0.0 # 进度单调性保护变量 self.last_progress = 0.0 # parameters and gains p = params or {} self.max_forward_mps = p.get('max_forward_mps', 0.6) # 更保守的速度 self.max_reverse_mps = p.get('max_reverse_mps', 0.2) # 保守的倒车速度 self.max_yawrate = p.get('max_yawrate', pi/4) # 更保守的转向速率 # 软启动和加减速 self.accel = p.get('accel', 0.2) # m/s^2 self.current_speed_target = 0.0 # 目标速度 self.current_speed = 0.0 # 实际速度 # 控制增益(提高响应速度和精度) self.k_heading = p.get('k_heading', 1.8) # 大幅提高转向增益,加快航向收敛 self.k_heading_d = p.get('k_heading_d', 0.25) # 增加微分作用,抑制振荡 self.k_xte = p.get('k_xte', 1.2) # 大幅提高横向误差增益,激进纠正偏离 self.k_curv = p.get('k_curv', 0.2) # 降低曲率前馈增益 self.base_speed = p.get('base_speed', 0.5) # 提高基础速度 # 路径跟踪参数(减小lookahead以提高精度) self.lookahead_time = p.get('lookahead_time', 2.0) # 减小前视时间 # 减小最小前视距离以提高轨迹精度 self.lookahead_min = p.get('lookahead_min', 0.35) self.lookahead_max = p.get('lookahead_max', 1.5) # 减小最大前视距离 self.goal_tolerance = p.get('goal_tolerance', 0.3) # m self.slow_zone = p.get('slow_zone', 1.5) # 减小减速区域 self.final_approach_dist = p.get('final_approach_dist', 0.8) # 减小精确接近区域 # 新增:横向误差相关 self.max_xte = p.get('max_xte', 1.0) # 减小最大横向误差限制,提高精度 self.xte_gain_scale = p.get('xte_gain_scale', 1.0) # 移除横向误差缩放,使用完整增益 # 速度控制缩放参数 self.curv_gain_scale = p.get('curv_gain_scale', 2.0) # 曲率缩放增益 self.heading_err_gain_scale = p.get('heading_err_gain_scale', 1.5) # 航向误差缩放增益 # 状态估计和滤波 self.yawrate_filter = 0.0 self.yawrate_alpha = p.get('yawrate_alpha', 0.90) # 进一步增加平滑度 self.last_heading_err = 0.0 # 用于D控制 self.last_time = None # 用于计算dt self.last_curv = 0.0 # 用于平滑曲率变化 # 累积误差控制 self.heading_err_sum = 0.0 # 航向误差积分 self.xte_sum = 0.0 # 横向误差积分 self.max_sum = 2.0 # 积分限幅 self.k_heading_i = p.get('k_heading_i', 0.01) # 航向误差积分增益 self.k_xte_i = p.get('k_xte_i', 0.01) # 横向误差积分增益 # 错误恢复 self.consecutive_large_err = 0 # 连续大误差计数 self.large_err_threshold = p.get('large_err_threshold', 0.5) # 大误差阈值 self.max_large_err_count = 10 # 最大连续大误差次数 # (日志文件将在下面统一创建) # state (most recent readings) self.pos = None # (x,y) self.heading = None # rad self.vel = (0.0, 0.0) # vx, vy in m/s (ground frame) self.imu_accel = None self.imu_gyro = None # control stages self.STAGE_GOTO_START = 'goto_start' # 移动到起点阶段 self.STAGE_FOLLOW_PATH = 'follow_path' # 跟随路径阶段 self.stage = self.STAGE_GOTO_START # 起点接近参数 - 收紧以确保真正到达起点 self.start_tolerance = p.get('start_tolerance', 0.12) # 位置容差(收紧到12cm) self.start_heading_tolerance = p.get('start_heading_tolerance', 0.01) # 航向容差 (~14°,收紧) self.start_approach_dist = p.get('start_approach_dist', 1.5) # 减速距离 self.start_slow_speed = p.get('start_slow_speed', 0.25) # 接近速度 self.start_min_speed = p.get('start_min_speed', 0.25) # 提高最小速度 # 新增:到起点时先转向再前进的阈值(若与起点方向误差大于此阈值,则先转向) self.start_turn_first_heading = p.get('start_turn_first_heading', pi/50) # ~15度 # 路径跟随阶段的最小前进速度(防止映射/缩放导致速度归零) self.min_follow_speed = p.get('min_follow_speed', 0.08) # Recovery 模式参数 self.in_recovery = False self.recovery_dist = p.get('recovery_dist', 2.0) self.recovery_exit_dist = p.get('recovery_exit_dist', 0.8) self.recovery_speed = p.get('recovery_speed', 0.55) # progress - 只使用点索引,不使用segment self.current_target_idx = 0 self.nearest_path_idx = 0 # 最近路径点索引 self.finished = False # goto_start 子状态:'rotate'|'move'|'align',保证顺序执行且不反复切换 self.goto_substage = 'rotate' # 拐点转向参数 self.waypoint_approach_mode = False # 是否在接近拐点模式 self.waypoint_turn_mode = False # 是否在拐点转向模式 self.waypoint_turn_target_idx = None # 当前转向的拐点在原始路径中的索引 self.waypoint_turn_tolerance = p.get('waypoint_turn_tolerance', 0.12) # 到达拐点的距离容差(12cm) self.waypoint_approach_dist = p.get('waypoint_approach_dist', 0.8) # 开始减速接近拐点的距离(80cm) self.waypoint_turn_heading_tol = p.get('waypoint_turn_heading_tol', 0.1) # 转向完成的航向容差(~6度) self.waypoint_turned = set() # 记录已经转向过的拐点,避免重复转向 # 创建日志文件 self.log_file = open('controller_log.csv', 'w') # 简化日志格式:移除segment相关 self.log_file.write('t,x,y,heading,target_x,target_y,target_dist,target_idx,xte,heading_err,forward,turn,speed,stage,status\n') def update_gps(self, pos_xy, heading, vel_xy=None, timestamp=None): """Update GPS-like measurements at ~10Hz. pos_xy: (x,y) in meters heading: radians vel_xy: (vx, vy) in m/s in global frame (optional) """ self.pos = tuple(pos_xy) self.heading = float(heading) if vel_xy is not None: self.vel = tuple(vel_xy) def update_imu(self, accel3, gyro3, timestamp=None): """Update IMU (100Hz). accel3 and gyro3 are 3-element tuples.""" self.imu_accel = tuple(accel3) self.imu_gyro = tuple(gyro3) def _calc_xte(self, p1, p2, pos): """Calculate cross track error (perpendicular distance to line segment)""" x, y = pos x1, y1 = p1 x2, y2 = p2 # Convert to local coordinates dx = x2 - x1 dy = y2 - y1 l2 = dx*dx + dy*dy if l2 == 0: # points are identical return hypot(x - x1, y - y1) # Project point onto line t = ((x - x1)*dx + (y - y1)*dy) / l2 if t < 0: # before start return hypot(x - x1, y - y1) elif t > 1: # after end return hypot(x - x2, y - y2) else: # projection point lies on segment px = x1 + t*dx py = y1 + t*dy xte = hypot(x - px, y - py) # Determine sign based on which side of the line the point is on sign = 1 if (x - px)*dy - (y - py)*dx > 0 else -1 return sign * xte def _is_waypoint_turn_needed(self, idx): """ 检测路径点idx是否是拐点(需要原地转向) 拐点定义:前后两段路径方向变化超过45度 """ if idx <= 0 or idx >= len(self.path) - 1: return False # 前一段方向 p_prev = self.path[idx - 1] p_curr = self.path[idx] p_next = self.path[idx + 1] dx1 = p_curr[0] - p_prev[0] dy1 = p_curr[1] - p_prev[1] dx2 = p_next[0] - p_curr[0] dy2 = p_next[1] - p_curr[1] # 计算两段方向 heading1 = atan2(dy1, dx1) heading2 = atan2(dy2, dx2) # 方向变化角度 angle_diff = abs(wrap_angle(heading2 - heading1)) # 如果方向变化超过45度,认为是拐点 return angle_diff > pi / 4 def _find_nearest_original_waypoint(self): """ 在原始路径中找到最接近当前位置的拐点 Returns: (waypoint_idx, distance) 或 (None, None) """ if self.pos is None: return None, None x, y = self.pos nearest_waypoint_idx = None nearest_dist = float('inf') # 遍历原始路径的所有拐点 for i in range(1, len(self.original_path) - 1): # 检查是否是拐点 p_prev = self.original_path[i - 1] p_curr = self.original_path[i] p_next = self.original_path[i + 1] dx1 = p_curr[0] - p_prev[0] dy1 = p_curr[1] - p_prev[1] dx2 = p_next[0] - p_curr[0] dy2 = p_next[1] - p_curr[1] heading1 = atan2(dy1, dx1) heading2 = atan2(dy2, dx2) angle_diff = abs(wrap_angle(heading2 - heading1)) # 如果是拐点(方向变化>45度) if angle_diff > pi / 4: dist = hypot(p_curr[0] - x, p_curr[1] - y) if dist < nearest_dist: nearest_dist = dist nearest_waypoint_idx = i return nearest_waypoint_idx, nearest_dist def _get_target_heading_at_waypoint(self, idx): """获取原始路径点idx处的目标航向(下一段的方向)""" if idx >= len(self.original_path) - 1: # 最后一个点,返回前一段方向 if idx > 0: dx = self.original_path[idx][0] - self.original_path[idx-1][0] dy = self.original_path[idx][1] - self.original_path[idx-1][1] return atan2(dy, dx) return 0.0 # 返回到下一个点的方向 dx = self.original_path[idx + 1][0] - self.original_path[idx][0] dy = self.original_path[idx + 1][1] - self.original_path[idx][1] return atan2(dy, dx) def _find_lookahead_point(self, lookahead_dist): """ 简化的Pure Pursuit前视点查找 - 只使用点索引 从nearest_path_idx开始向前搜索,找到第一个距离>=lookahead_dist的点 Returns: (lookahead_xy, curvature, lookahead_idx, nearest_idx) """ if self.pos is None: return self.path[0], 0.0, 0, 0 x, y = self.pos # 1. 找最近点 - 如果离路径很远,使用全局搜索;否则使用窗口搜索 # 先检查当前nearest_path_idx的距离 px, py = self.path[self.nearest_path_idx] current_nearest_dist = hypot(px - x, py - y) # 如果距离>1.0m,使用全局搜索 if current_nearest_dist > 1.0: search_start = 0 search_end = len(self.path) else: # 使用窗口搜索 search_start = max(0, self.nearest_path_idx - 5) search_end = min(len(self.path), self.nearest_path_idx + 30) nearest_idx = self.nearest_path_idx nearest_dist = float('inf') for i in range(search_start, search_end): px, py = self.path[i] d = hypot(px - x, py - y) if d < nearest_dist: nearest_dist = d nearest_idx = i self.nearest_path_idx = nearest_idx # 更新最近点索引 # 2. 从nearest_idx开始向前搜索lookahead点 lookahead_idx = nearest_idx for i in range(nearest_idx, len(self.path)): px, py = self.path[i] d = hypot(px - x, py - y) if d >= lookahead_dist: lookahead_idx = i break else: # 没找到,使用最后一个点 lookahead_idx = len(self.path) - 1 # 3. 估计曲率(基于nearest_idx附近的点) curvature = 0.0 if 1 <= nearest_idx < len(self.path) - 1: p1 = self.path[nearest_idx - 1] p2 = self.path[nearest_idx] p3 = self.path[nearest_idx + 1] v1 = (p2[0] - p1[0], p2[1] - p1[1]) v2 = (p3[0] - p2[0], p3[1] - p2[1]) l1 = hypot(v1[0], v1[1]) l2 = hypot(v2[0], v2[1]) if l1 > 1e-6 and l2 > 1e-6: dot = v1[0]*v2[0] + v1[1]*v2[1] cos_angle = max(-1.0, min(1.0, dot / (l1 * l2))) angle_change = abs(pi - acos(cos_angle)) curvature = angle_change / max(l1, 1e-6) return self.path[lookahead_idx], curvature, lookahead_idx, nearest_idx def _find_nearest_original_waypoint(self): """ 在原始路径中查找下一个未到达的拐点 按照路径顺序查找,不跳过 Returns: (waypoint_idx, distance) 或 (None, inf) 如果没有找到 """ if not hasattr(self, 'original_path') or len(self.original_path) < 3: return None, float('inf') x, y = self.pos # 遍历原始路径中的拐点(跳过起点和终点) for i in range(1, len(self.original_path) - 1): # 如果这个拐点已经转向过,跳过 if i in self.waypoint_turned: continue # 检查方向是否改变 prev_pt = self.original_path[i-1] curr_pt = self.original_path[i] next_pt = self.original_path[i+1] # 计算前后方向向量 dx1 = curr_pt[0] - prev_pt[0] dy1 = curr_pt[1] - prev_pt[1] dx2 = next_pt[0] - curr_pt[0] dy2 = next_pt[1] - curr_pt[1] # 计算角度 angle1 = atan2(dy1, dx1) angle2 = atan2(dy2, dx2) angle_change = abs(wrap_angle(angle2 - angle1)) # 如果角度变化大于45度,认为是拐点 if angle_change > pi / 4: # 找到第一个未完成的拐点,直接返回 dist = hypot(curr_pt[0] - x, curr_pt[1] - y) return i, dist return None, float('inf') def _get_target_heading_at_waypoint(self, waypoint_idx): """ 获取拐点处的目标航向(下一段路径的方向) Args: waypoint_idx: 拐点在原始路径中的索引 Returns: 目标航向(弧度) """ if waypoint_idx is None: return 0.0 # 默认向东 # 如果是最后一个点,使用前一段的方向(保持不变) if waypoint_idx >= len(self.original_path) - 1: if waypoint_idx > 0: prev_pt = self.original_path[waypoint_idx - 1] curr_pt = self.original_path[waypoint_idx] return atan2(curr_pt[1] - prev_pt[1], curr_pt[0] - prev_pt[0]) else: return 0.0 # 只有一个点,默认向东 curr_pt = self.original_path[waypoint_idx] next_pt = self.original_path[waypoint_idx + 1] dx = next_pt[0] - curr_pt[0] dy = next_pt[1] - curr_pt[1] return atan2(dy, dx) def _compute_xte_simple(self, pos): """ 简化的横向误差计算 - 基于最近点 Returns: (xte, proj_x, proj_y) """ x, y = pos nearest_idx = self.nearest_path_idx # 计算到最近点的向量 px, py = self.path[nearest_idx] # 如果有下一个点,计算到路径段的横向误差 if nearest_idx < len(self.path) - 1: nx, ny = self.path[nearest_idx + 1] # 段方向向量 dx = nx - px dy = ny - py seg_len2 = dx*dx + dy*dy if seg_len2 > 1e-9: # 投影到段上 t = ((x - px)*dx + (y - py)*dy) / seg_len2 t = max(0.0, min(1.0, t)) # 限制在[0,1] proj_x = px + t * dx proj_y = py + t * dy # 横向误差(带符号) xte_vec_x = x - proj_x xte_vec_y = y - proj_y xte_mag = hypot(xte_vec_x, xte_vec_y) # 使用叉乘判断左右 cross = xte_vec_x * dy - xte_vec_y * dx xte = xte_mag if cross > 0 else -xte_mag return xte, proj_x, proj_y # 否则直接用到最近点的距离 xte = hypot(x - px, y - py) return xte, px, py def compute_control(self, timestamp=None): """Compute forward and turn outputs mapped to [-100,100]. Returns dict with keys: forward(int), turn(int), info(dict) """ if self.finished: return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} if self.pos is None or self.heading is None: return {'forward': 0, 'turn': 0, 'info': {'status': 'no_state'}} # ensure timestamp is numeric for logging/formatting if timestamp is None: timestamp = time.time() # 计算时间增量 if self.last_time is None: dt = 0.014 # 约74Hz else: dt = timestamp - self.last_time if timestamp else 0.014 self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0) x, y = self.pos # 处理移动到起点阶段:使用持久子状态确保顺序执行:rotate -> move -> align if self.stage == self.STAGE_GOTO_START: start_point = self.path[0] dx = start_point[0] - x dy = start_point[1] - y dist_to_start = hypot(dx, dy) # 目标朝向指向起点 desired_heading_to_start = atan2(dy, dx) heading_err_to_start = wrap_angle(desired_heading_to_start - self.heading) # 初始化子状态(保守) if not hasattr(self, 'goto_substage') or self.goto_substage is None: self.goto_substage = 'rotate' # 子状态:rotate -> move -> align if self.goto_substage == 'rotate': # 原地转向直到朝向误差足够小(移除距离退出条件,确保先转好) if abs(heading_err_to_start) <= self.start_turn_first_heading: self.goto_substage = 'move' else: yawrate_cmd = self.k_heading * heading_err_to_start * 1.3 # 稍微提高转向速度 turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) turn_signal = max(-100, min(100, turn_signal)) forward_signal = 0 # target is start_point for goto_start substage target_x, target_y = start_point target_dist = hypot(target_x - x, target_y - y) target_idx = 0 self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' f'0.000,{heading_err_to_start:.3f},{forward_signal},{turn_signal},0.000,{self.stage},rotate_to_start\n') self.log_file.flush() return {'forward': forward_signal, 'turn': turn_signal, 'info': { 'status': 'rotate_to_start', 'stage': self.stage, 'heading_err': heading_err_to_start, 'dist_to_start': dist_to_start }} if self.goto_substage == 'move': # 在移动子状态期间,不会回到 rotate 子状态,直到显式切换 desired_heading = desired_heading_to_start heading_err = wrap_angle(desired_heading - self.heading) # 如果已经接近起点则进入 align 子状态 if dist_to_start < self.start_tolerance: self.goto_substage = 'align' else: # 常规:面向起点并前进 if dist_to_start < self.start_approach_dist: dist_ratio = dist_to_start / self.start_approach_dist speed_scale = 0.4 + 0.6 * dist_ratio # 提高最小速度比例 base_target_speed = self.start_slow_speed + (self.base_speed - self.start_slow_speed) * dist_ratio else: speed_scale = 1.0 base_target_speed = min(self.base_speed * 0.8, 0.5) # 限制最大速度避免过冲 # 航向误差缩放更宽松 heading_ratio = abs(heading_err) / pi err_scale = 0.5 + 0.5 * (1.0 - heading_ratio) # 更宽松 target_speed = max(self.start_min_speed * 1.5, base_target_speed * speed_scale * err_scale) target_speed = max(target_speed, self.start_min_speed * 1.2) # 确保最小速度 yawrate_cmd = self.k_heading * heading_err forward_signal = int(round((target_speed / self.max_forward_mps) * 100)) forward_signal = max(-100, min(100, forward_signal)) turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) turn_signal = max(-100, min(100, turn_signal)) target_x, target_y = start_point target_dist = hypot(target_x - x, target_y - y) target_idx = 0 self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' f'0.000,{heading_err:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},moving_to_start\n') self.log_file.flush() return {'forward': forward_signal, 'turn': turn_signal, 'info': { 'status': 'moving_to_start', 'stage': self.stage, 'dist_to_start': dist_to_start, 'heading_err': heading_err }} # align 子状态:到达起点后进行原地朝向修正,完成后切换到跟踪 if self.goto_substage == 'align': if len(self.path) > 1: path_dx = self.path[1][0] - self.path[0][0] path_dy = self.path[1][1] - self.path[0][1] desired_start_heading = atan2(path_dy, path_dx) else: desired_start_heading = 0.0 heading_err_final = wrap_angle(desired_start_heading - self.heading) # 如果最终朝向对齐或非常接近位置,则切换到跟踪 # 只有同时满足位置与航向条件才算真正到达起点,避免“偏近+朝向差”提前切换 if (dist_to_start < self.start_tolerance) and (abs(heading_err_final) < self.start_heading_tolerance): self.stage = self.STAGE_FOLLOW_PATH self.goto_substage = None # 保持 current_target_idx = 0,让 pure pursuit 自然选择前视点 self.current_target_idx = 0 # 重置速度为较小值,避免 lookahead 过大 self.current_speed = self.start_min_speed # 小幅前进以确保不卡在起点 f_sig = int(round((self.start_min_speed / self.max_forward_mps) * 100)) target_x, target_y = self.path[0] # 使用起点作为初始目标 target_dist = hypot(target_x - x, target_y - y) target_idx = 0 self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' f'0.000,{heading_err_final:.3f},{f_sig},0,{self.start_min_speed:.3f},{self.stage},reached_start\n') self.log_file.flush() return {'forward': f_sig, 'turn': 0, 'info': {'status': 'reached_start', 'stage': self.stage, 'advanced_idx': self.current_target_idx}} else: # 在起点附近但朝向不对,进行原地修正(停止前进) sign = 1 if heading_err_final >= 0 else -1 yawrate_cmd = sign * min(self.max_yawrate * 0.6, max(0.05, abs(heading_err_final) * 0.8)) turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) turn_signal = max(-100, min(100, turn_signal)) target_x, target_y = start_point target_dist = hypot(target_x - x, target_y - y) target_idx = 0 self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' f'0.000,{heading_err_final:.3f},0,{turn_signal},{0.000:.3f},{self.stage},align_at_start\n') self.log_file.flush() return {'forward': 0, 'turn': turn_signal, 'info': {'status': 'align_at_start', 'heading_err': heading_err_final}} # 计算时间增量 if self.last_time is None: dt = 0.014 # 约74Hz else: dt = timestamp - self.last_time if timestamp else 0.014 self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0) # 计算横向误差(使用简化方法) x, y = self.pos xte, proj_x, proj_y = self._compute_xte_simple(self.pos) # 预先计算到终点的距离 final_dist = hypot(self.path[-1][0] - x, self.path[-1][1] - y) # ========== 拐点接近和转向模式检测 ========== # 检查是否需要进入拐点接近或转向模式 if not self.waypoint_approach_mode and not self.waypoint_turn_mode: # 在原始路径中查找最近的拐点 waypoint_idx, waypoint_dist = self._find_nearest_original_waypoint() # 如果拐点在接近距离内(2m)且未转向过,进入接近模式 # 注意:增大检测距离,确保在短边也能提前进入接近模式 if (waypoint_idx is not None and waypoint_dist < 2.0 and waypoint_idx not in self.waypoint_turned): self.waypoint_approach_mode = True self.waypoint_turn_target_idx = waypoint_idx # 记录当前接近的拐点 self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{self.original_path[waypoint_idx][0]:.3f},' f'{self.original_path[waypoint_idx][1]:.3f},' f'{waypoint_dist:.3f},{waypoint_idx},' f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_waypoint_approach\n') self.log_file.flush() # ========== 拐点接近模式:精确接近拐点位置 ========== if self.waypoint_approach_mode: waypoint_x, waypoint_y = self.original_path[self.waypoint_turn_target_idx] dx = waypoint_x - x dy = waypoint_y - y waypoint_dist = hypot(dx, dy) # 检查是否到达拐点(12cm容差) if waypoint_dist < self.waypoint_turn_tolerance: # 到达拐点,切换到转向模式 self.waypoint_approach_mode = False self.waypoint_turn_mode = True self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{waypoint_x:.3f},{waypoint_y:.3f},' f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},' f'{xte:.3f},0.000,0,0,0.000,{self.stage},reached_waypoint\n') self.log_file.flush() else: # 直接朝向拐点前进(忽略路径,直接目标点导航) desired_heading_to_waypoint = atan2(dy, dx) heading_err_to_waypoint = wrap_angle(desired_heading_to_waypoint - self.heading) # 速度控制:接近时逐渐减速 if waypoint_dist < 0.5: # 50cm内减速 speed_ratio = waypoint_dist / 0.5 target_speed = self.start_min_speed * (0.4 + 0.6 * speed_ratio) else: target_speed = self.base_speed * 0.7 # 航向误差也会降低速度 heading_ratio = abs(heading_err_to_waypoint) / pi err_scale = 0.5 + 0.5 * (1.0 - heading_ratio) target_speed = target_speed * err_scale target_speed = max(self.start_min_speed * 0.5, target_speed) # 转向控制 yawrate_cmd = self.k_heading * heading_err_to_waypoint * 1.3 # 稍微提高转向增益 forward_signal = int(round((target_speed / self.max_forward_mps) * 100)) forward_signal = max(-100, min(100, forward_signal)) turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) turn_signal = max(-100, min(100, turn_signal)) self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{waypoint_x:.3f},{waypoint_y:.3f},' f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},' f'{xte:.3f},{heading_err_to_waypoint:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},approaching_waypoint\n') self.log_file.flush() return {'forward': forward_signal, 'turn': turn_signal, 'info': { 'status': 'approaching_waypoint', 'waypoint_dist': waypoint_dist, 'heading_err': heading_err_to_waypoint, 'target_speed': target_speed }} # ========== 拐点转向模式:原地转向 ========== if self.waypoint_turn_mode: # 获取目标航向(下一段的方向) target_heading = self._get_target_heading_at_waypoint(self.waypoint_turn_target_idx) heading_err = wrap_angle(target_heading - self.heading) # 检查是否转向完成 if abs(heading_err) < self.waypoint_turn_heading_tol: # 转向完成,退出转向模式 self.waypoint_turned.add(self.waypoint_turn_target_idx) # 记录已转向 self.waypoint_turn_mode = False self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},' f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},' f'0.000,{self.waypoint_turn_target_idx},' f'{xte:.3f},{heading_err:.3f},0,0,0.000,{self.stage},exit_waypoint_turn\n') self.log_file.flush() else: # 原地转向:forward=0, turn基于航向误差 yawrate_cmd = self.k_heading * heading_err * 1.2 # 稍微提高转向增益 turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) turn_signal = max(-100, min(100, turn_signal)) self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},' f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},' f'0.000,{self.waypoint_turn_target_idx},' f'{xte:.3f},{heading_err:.3f},0,{turn_signal},0.000,{self.stage},waypoint_turning\n') self.log_file.flush() return {'forward': 0, 'turn': turn_signal, 'info': { 'status': 'waypoint_turning', 'heading_err': heading_err, 'target_heading': target_heading }} # 动态前视距离:基于当前速度和横向误差 actual_speed = abs(self.current_speed) xte_mag = abs(xte) # lookahead = lookahead_min + speed_component - xte_penalty base_lookahead = self.lookahead_min + self.lookahead_time * actual_speed - 0.3 * min(xte_mag, 0.5) lookahead = max(self.lookahead_min, min(self.lookahead_max, base_lookahead)) # 恢复模式判定 dist_to_path = abs(xte) if not self.in_recovery and dist_to_path > self.recovery_dist: self.in_recovery = True self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},' f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_recovery\n') self.log_file.flush() if self.in_recovery and dist_to_path < self.recovery_exit_dist: self.in_recovery = False self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},' f'{xte:.3f},0.000,0,0,0.000,{self.stage},exit_recovery\n') self.log_file.flush() # ========== 标准 Pure Pursuit 控制 ========== # 选择前视点 (lx, ly), path_curvature, lookahead_idx, nearest_idx = self._find_lookahead_point(lookahead) self.current_target_idx = lookahead_idx # 计算前视点方向 path_heading = atan2(ly - y, lx - x) # 横向误差修正 xte_normalized = max(-1.0, min(1.0, xte / self.max_xte)) xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale # 期望航向 = 前视方向 + xte修正 desired_heading = wrap_angle(path_heading + xte_correction) heading_err = wrap_angle(desired_heading - self.heading) # 计算航向误差的变化率 heading_err_d = (heading_err - self.last_heading_err) / dt if dt > 0 else 0 self.last_heading_err = heading_err # 积分项计算(带防饱和) self.heading_err_sum += heading_err * dt self.xte_sum += xte * dt self.heading_err_sum = max(-self.max_sum, min(self.max_sum, self.heading_err_sum)) self.xte_sum = max(-self.max_sum, min(self.max_sum, self.xte_sum)) # PID控制 yawrate_cmd = (self.k_heading * heading_err + self.k_heading_d * heading_err_d + self.k_heading_i * self.heading_err_sum + self.k_xte_i * self.xte_sum) # 速度控制:基于曲率、航向误差、横向误差 # 1) 曲率缩放 curv_scale = 1.0 / (1.0 + abs(path_curvature) * self.curv_gain_scale) # 2) 航向误差缩放 heading_err_abs = abs(heading_err) err_scale = 1.0 / (1.0 + heading_err_abs * self.heading_err_gain_scale) # 3) 横向误差缩放 xte_abs = abs(xte) xte_scale = 1.0 / (1.0 + xte_abs * self.xte_gain_scale) # 混合缩放:几何平均 + 最小值 geom_scale = (curv_scale * err_scale * xte_scale) ** (1.0/3.0) min_scale = min(curv_scale, err_scale, xte_scale) blend_scale = 0.8 * geom_scale + 0.2 * min_scale target_speed = self.base_speed * blend_scale # 滤波和限幅 self.yawrate_filter = (self.yawrate_alpha * yawrate_cmd + (1-self.yawrate_alpha) * self.yawrate_filter) max_yawrate = self.max_yawrate yawrate_cmd = max(-max_yawrate, min(max_yawrate, self.yawrate_filter)) # 恢复模式覆盖(优先级更高) if self.in_recovery: # 恢复模式:朝向投影点 path_heading = atan2(proj_y - y, proj_x - x) xte_normalized = max(-1.0, min(1.0, xte / self.max_xte)) xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale desired_heading = wrap_angle(path_heading + xte_correction) heading_err = wrap_angle(desired_heading - self.heading) yawrate_cmd = self.k_heading * heading_err target_speed = self.recovery_speed # 基于距离终点的距离调整速度 in_slow_zone = final_dist < self.slow_zone in_final_approach = final_dist < self.final_approach_dist # 错误恢复逻辑 if abs(xte) > self.large_err_threshold * 1.5 or abs(heading_err) > pi/2.5: self.consecutive_large_err += 1 if self.consecutive_large_err > self.max_large_err_count: # 重置到安全状态 self.consecutive_large_err = 0 self.heading_err_sum = 0 self.xte_sum = 0 else: self.consecutive_large_err = 0 # 接近终点时调整速度 if not self.in_recovery: if in_slow_zone: dist_scale = max(0.3, min(1.0, final_dist / self.slow_zone)) if in_final_approach: dist_scale *= max(0.4, final_dist / self.final_approach_dist) target_speed *= dist_scale # 在终点附近增加精度 if in_final_approach: target_speed *= 0.6 if abs(xte) > 0.15 or abs(heading_err) > pi/10: target_speed *= 0.7 # 强制最小速度(避免完全停滞) if self.stage == self.STAGE_FOLLOW_PATH and not self.in_recovery: dynamic_min = self.min_follow_speed if in_slow_zone else self.min_follow_speed * 1.5 if abs(heading_err) < 3.0 and target_speed < dynamic_min: target_x, target_y = lx, ly target_dist = hypot(target_x - x, target_y - y) self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' f'{xte:.3f},{heading_err:.3f},0,0,' f'{dynamic_min:.3f},{self.stage},enforce_min_speed\n') self.log_file.flush() target_speed = dynamic_min # 软启动:限制加速度 accel = self.accel * (0.5 if in_slow_zone else 1.0) speed_change = target_speed - self.current_speed max_change = accel * dt self.current_speed += max(-max_change, min(max_change, speed_change)) # 倒车控制 if self.stage == self.STAGE_GOTO_START: if abs(heading_err) > 2.9: self.current_speed = -self.max_reverse_mps * 0.5 else: if abs(heading_err) > 2.9 and not self.in_recovery: self.current_speed = -self.max_reverse_mps * 0.4 if self.current_speed < 0 and abs(heading_err) < 2.5: self.current_speed = 0.0 # 终点检测(简化,只基于距离和点索引) if final_dist < self.goal_tolerance: self.finished = True return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} # 扩展完成条件:接近终点且误差小 if (final_dist < self.goal_tolerance * 1.5 and self.current_target_idx >= self.n - 2 and abs(heading_err) < 0.3 and abs(xte) < 0.2): self.finished = True return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} # map to signals [-100,100] # 记录映射前的速度信息以便诊断 log_t = timestamp if timestamp is not None else time.time() pre_map_speed = self.current_speed # 诊断日志(简化) target_x, target_y = lx, ly target_dist = hypot(target_x - x, target_y - y) pre_map_forward = int(round((pre_map_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 pre_map_forward = max(-100, min(100, pre_map_forward)) pre_map_turn = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0 pre_map_turn = max(-100, min(100, pre_map_turn)) self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' f'{xte:.3f},{heading_err:.3f},{pre_map_forward},{pre_map_turn},{pre_map_speed:.3f},' f'{self.stage},pre_map\n') self.log_file.flush() forward_signal = int(round((self.current_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 forward_signal = max(-100, min(100, forward_signal)) # 强制最小前进信号(防止完全停止) if forward_signal == 0 and not self.finished and abs(heading_err) < 3.0: forced_signal = int(round((self.min_follow_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 if forced_signal != 0: target_x, target_y = lx, ly target_dist = hypot(target_x - x, target_y - y) self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' f'{xte:.3f},{heading_err:.3f},{forced_signal},{int(round((yawrate_cmd / self.max_yawrate) * 100))},' f'{self.min_follow_speed:.3f},{self.stage},force_min_forward\n') self.log_file.flush() forward_signal = forced_signal self.current_speed = self.min_follow_speed turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0 turn_signal = max(-100, min(100, turn_signal)) status = 'finished' if self.finished else 'running' info = { 'status': status, 'pos': (x, y), 'heading': self.heading, 'desired_heading': desired_heading, 'heading_err': heading_err, 'current_speed': self.current_speed, 'yawrate_cmd': yawrate_cmd, 'target_xy': (lx, ly), 'path_curvature': path_curvature, 'final_dist': final_dist, } # 简化的日志记录 t = log_t target_x, target_y = lx, ly target_dist = hypot(target_x - x, target_y - y) self.log_file.write(f'{t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' f'{xte:.3f},{heading_err:.3f},{forward_signal},{turn_signal},{self.current_speed:.3f},' f'{self.stage},{status}\n') self.log_file.flush() return {'forward': forward_signal, 'turn': turn_signal, 'info': info} if __name__ == '__main__': # quick manual test import json path = [[0,0],[5,0],[10,0]] c = MowerController(path) c.update_gps((0, -1.0), 0.0) for i in range(20): out = c.compute_control() print(i, out) # fake forward motion f = out['forward'] / 100.0 * c.max_forward_mps hd = c.heading + (out['turn'] / 100.0) * c.max_yawrate * 0.1 px = c.pos[0] + f * cos(hd) * 0.1 py = c.pos[1] + f * sin(hd) * 0.1 c.update_gps((px,py), hd)