yincheng.zhong
8 天以前 b9aca884f88abdda860d5c62be841a9e21ce68a1
添加python 算法仿真代码
已添加12个文件
已修改1个文件
1935 ■■■■■ 文件已修改
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/analyze_path_heading.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/check_approach_trajectory.py 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/check_path_heading.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/check_trajectory.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/debug_desired_heading.py 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/debug_pure_pursuit.py 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/debug_start_check.py 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/mower_controller.py 1013 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/path_planner.py 164 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/sim_offline.py 241 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/view_simulation.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
python/viz.py 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -14,3 +14,4 @@
/STM32H743/MDK-ARM/STM32H743/*.o
/STM32H743/MDK-ARM/STM32H743/*.htm
/STM32H743/MDK-ARM/STM32H743/*.axf
/STM32H743/MDK-ARM/STM32H743/*.hex
python/analyze_path_heading.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
"""Analyze path_heading from log"""
from math import atan2, pi
with open('controller_log.csv') as f:
    lines = [l.strip() for l in f.readlines()[1:]]
filtered = [l.split(',') for l in lines if len(l.split(',')) >= 9]
print("Analysis at key time points:")
for target_t in [10.0, 20.0, 30.0]:
    rows = [p for p in filtered if abs(float(p[0]) - target_t) < 0.05]
    if rows:
        r = rows[0]
        t = float(r[0])
        x, y = float(r[1]), float(r[2])
        heading = float(r[3])
        tx, ty = float(r[4]), float(r[5])
        xte = float(r[8])
        heading_err = float(r[9])
        path_heading = atan2(ty-y, tx-x)
        print(f"\nt={t:.1f}s:")
        print(f"  pos=({x:.3f}, {y:.3f}), heading={heading:.3f} rad = {heading*180/pi:.1f} deg")
        print(f"  target=({tx:.3f}, {ty:.3f})")
        print(f"  path_heading={path_heading:.3f} rad = {path_heading*180/pi:.1f} deg")
        print(f"  xte={xte:.3f}, heading_err={heading_err:.3f}")
        print(f"  => desired_heading = path_heading + xte_correction")
        print(f"             = {path_heading:.3f} + (-1.2 * {xte:.3f})")
        print(f"             = {path_heading - 1.2*xte:.3f} rad = {(path_heading - 1.2*xte)*180/pi:.1f} deg")
python/check_approach_trajectory.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,79 @@
"""
Check trajectory with approach path visualization
"""
import json
# Load controller log
lines = []
with open('controller_log.csv', 'r') as f:
    for line in f:
        lines.append(line.strip())
# Find reached_start marker
reached_start_idx = None
for i, line in enumerate(lines):
    if 'reached_start' in line:
        reached_start_idx = i
        print(f"Found reached_start at line {i+1}")
        break
if reached_start_idx is None:
    print("No reached_start found")
    exit(1)
# Parse and display trajectory
print("\n=== TRAJECTORY ANALYSIS WITH APPROACH PATH ===\n")
# Show first 20 frames after reached_start
print("First 20 frames after reached_start (now following combined path):")
for i in range(reached_start_idx + 1, min(reached_start_idx + 21, len(lines))):
    parts = lines[i].split(',')
    if len(parts) >= 8:
        t = float(parts[0])
        x, y = float(parts[1]), float(parts[2])
        heading = float(parts[3])
        target_x, target_y = float(parts[4]), float(parts[5])
        target_idx = int(parts[7])
        xte = float(parts[8]) if len(parts) > 8 else 0.0
        h_err = float(parts[9]) if len(parts) > 9 else 0.0
        print(f"t={t:6.3f} pos=({x:6.3f},{y:6.3f}) h={heading:5.3f} target=({target_x:6.3f},{target_y:6.3f}) idx={target_idx:3d} xte={xte:6.3f} h_err={h_err:6.3f}")
# Sample every 10 seconds
print("\n\nPosition every 10 seconds:")
for target_t in [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110]:
    for line in lines[reached_start_idx+1:]:
        parts = line.split(',')
        if len(parts) >= 8:
            t = float(parts[0])
            if abs(t - target_t) < 0.1:  # within 0.1s
                x, y = float(parts[1]), float(parts[2])
                heading = float(parts[3])
                print(f"t={t:6.1f}s: pos=({x:7.3f},{y:7.3f}) heading={heading:6.3f}")
                break
# Check if completed
print("\n\nCompletion status:")
last_line = lines[-1]
parts = last_line.split(',')
if len(parts) >= 14:
    t = float(parts[0])
    x, y = float(parts[1]), float(parts[2])
    stage = parts[13]
    print(f"Last frame: t={t:.1f}s, pos=({x:.3f},{y:.3f}), stage={stage}")
    if t >= 119.0:
        print("⚠️ TIMEOUT - did not complete in 120s")
    else:
        print("✓ Completed successfully")
# Load paths and check indices
print("\n\n=== PATH INFORMATION ===")
with open('example_path.json', 'r') as f:
    work_path = json.load(f)
print(f"Work path: {len(work_path)} points ({len(work_path)-1} segments)")
print(f"Work path range: x=[{min(p[0] for p in work_path):.1f}, {max(p[0] for p in work_path):.1f}], "
      f"y=[{min(p[1] for p in work_path):.1f}, {max(p[1] for p in work_path):.1f}]")
# Estimate approach path length
print(f"\nNote: Combined path includes approach path + work path")
print(f"Check simulation_results.png to see both paths visualized")
python/check_path_heading.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,25 @@
"""Check path segment headings"""
from math import atan2, pi
approach_points = [
    (-1.5, -1.0),
    (-1.262, -0.749),
    (-1.045, -0.514),
    (-0.829, -0.309),
    (-0.596, -0.146),
    (-0.326, -0.039),
    (0, 0)
]
print("Approach path segment headings:")
for i in range(len(approach_points)-1):
    p1, p2 = approach_points[i], approach_points[i+1]
    heading = atan2(p2[1]-p1[1], p2[0]-p1[0])
    print(f"Segment {i}->{i+1}: heading={heading:.3f} rad = {heading*180/pi:.1f} deg")
print("\nWork path segment headings (first few):")
work_points = [(0, 0), (10, 0), (10, 1), (0, 1)]
for i in range(len(work_points)-1):
    p1, p2 = work_points[i], work_points[i+1]
    heading = atan2(p2[1]-p1[1], p2[0]-p1[0])
    print(f"Segment {i}->{i+1}: heading={heading:.3f} rad = {heading*180/pi:.1f} deg")
python/check_trajectory.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,42 @@
#!/usr/bin/env python
# æ£€æŸ¥è½¨è¿¹è¯¦ç»†æ•°æ®
with open('controller_log.csv', 'r') as f:
    lines = f.readlines()
# æ‰¾åˆ° reached_start
reached_idx = None
for i, line in enumerate(lines):
    if 'reached_start' in line:
        reached_idx = i
        break
if reached_idx:
    print(f"Found reached_start at line {reached_idx}")
    # å‰20帧的详细数据
    print("\nFirst 20 frames after reached_start:")
    count = 0
    for i in range(reached_idx + 1, min(reached_idx + 100, len(lines))):
        if 'running' in lines[i]:
            parts = lines[i].strip().split(',')
            if len(parts) >= 11:
                t, x, y, heading, tx, ty, tdist, tidx, xte, herr = parts[:10]
                print(f"t={t:>6s} pos=({x:>6s},{y:>6s}) h={heading:>5s} target=({tx:>6s},{ty:>6s}) idx={tidx:>3s} xte={xte:>6s} h_err={herr:>6s}")
                count += 1
                if count >= 20:
                    break
    # å¤šä¸ªæ—¶é—´ç‚¹çš„位置
    print("\n\nPosition every 10 seconds:")
    target_times = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]
    for target_t in target_times:
        for i in range(reached_idx + 1, len(lines)):
            if 'running' in lines[i]:
                parts = lines[i].strip().split(',')
                if len(parts) >= 3:
                    t = float(parts[0])
                    if t >= target_t:
                        x, y, heading = parts[1], parts[2], parts[3]
                        print(f"t={t:>6.1f}s: pos=({x:>7s},{y:>7s}) heading={heading:>6s}")
                        break
python/debug_desired_heading.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,54 @@
"""Debug desired_heading calculation"""
from math import atan2, pi
def wrap_angle(a):
    while a > pi:
        a -= 2 * pi
    while a < -pi:
        a += 2 * pi
    return a
# From log at t=10s
x, y = -0.282, -0.638
heading = -0.470
tx, ty = 0.000, 0.000
xte = 0.573
heading_err_log = -0.095
# Calculate path_heading
path_heading = atan2(ty - y, tx - x)
print(f"path_heading = atan2({ty} - {y}, {tx} - {x}) = atan2({ty-y:.3f}, {tx-x:.3f}) = {path_heading:.3f}")
# Calculate xte_correction
k_xte = 1.2
max_xte = 1.0
xte_gain_scale = 1.0
xte_normalized = max(-1.0, min(1.0, xte / max_xte))
xte_correction = -k_xte * xte_normalized * xte_gain_scale
print(f"xte_correction = -k_xte * xte_normalized * xte_gain_scale")
print(f"               = -{k_xte} * {xte_normalized:.3f} * {xte_gain_scale}")
print(f"               = {xte_correction:.3f}")
# Calculate desired_heading
desired_heading_raw = path_heading + xte_correction
desired_heading = wrap_angle(desired_heading_raw)
print(f"desired_heading = wrap_angle({path_heading:.3f} + {xte_correction:.3f})")
print(f"                = wrap_angle({desired_heading_raw:.3f})")
print(f"                = {desired_heading:.3f}")
# Calculate heading_err
heading_err_calc = wrap_angle(desired_heading - heading)
print(f"\nheading_err = wrap_angle({desired_heading:.3f} - {heading:.3f})")
print(f"            = wrap_angle({desired_heading - heading:.3f})")
print(f"            = {heading_err_calc:.3f}")
print(f"\n heading_err from log: {heading_err_log:.3f}")
print(f"heading_err calculated: {heading_err_calc:.3f}")
print(f"           Difference: {abs(heading_err_log - heading_err_calc):.3f}")
if abs(heading_err_log - heading_err_calc) > 0.1:
    print("\n⚠️ WARNING: Significant difference detected!")
    print("Either:")
    print("1. xte_correction is not being applied")
    print("2. Different target point is being used")
    print("3. Log data is from a different code path")
python/debug_pure_pursuit.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,65 @@
"""Debug Pure Pursuit logic"""
import json
from math import pi, hypot, atan2, cos, sin
from path_planner import plan_approach_path, compute_path_heading, combine_paths
from mower_controller import MowerController
# Load work path
work_path = json.load(open('example_path.json'))
mower_start_pos = (work_path[0][0] - 1.5, work_path[0][1] - 1.0)
mower_start_heading = pi / 4
# Generate approach path
work_path_heading = compute_path_heading(work_path, 0)
approach_path = plan_approach_path(mower_start_pos, mower_start_heading,
                                   work_path[0], work_path_heading,
                                   method='smooth')
# Combine paths
combined_path, approach_end_idx = combine_paths(approach_path, work_path)
print(f"Combined path: {len(combined_path)} points")
print(f"First 10 points:")
for i in range(min(10, len(combined_path))):
    print(f"  {i}: {combined_path[i]}")
# Initialize controller
ctrl = MowerController(combined_path)
print(f"\nDensified path: {len(ctrl.path)} points")
print(f"First 20 densified points:")
for i in range(min(20, len(ctrl.path))):
    print(f"  {i}: {ctrl.path[i]}")
# Simulate a few steps
ctrl.update_gps(pos_xy=mower_start_pos, heading=mower_start_heading)
ctrl.stage = ctrl.STAGE_FOLLOW_PATH  # Skip goto_start
print("\n=== Testing Pure Pursuit Logic ===")
for step in range(5):
    result = ctrl.compute_control(timestamp=step * 0.014)
    x, y = ctrl.pos
    target_idx = ctrl.current_target_idx if hasattr(ctrl, 'current_target_idx') else -1
    if target_idx >= 0 and target_idx < len(ctrl.path):
        tx, ty = ctrl.path[target_idx]
        target_heading = atan2(ty - y, tx - x)
        print(f"\nStep {step}:")
        print(f"  Pos: ({x:.3f}, {y:.3f}), heading: {ctrl.heading:.3f}")
        print(f"  Target idx: {target_idx}, target: ({tx:.3f}, {ty:.3f})")
        print(f"  Target heading: {target_heading:.3f}")
        print(f"  Command: forward={result['forward']}, turn={result['turn']}")
    # Simulate motion (simplified kinematic model)
    dt = 0.014
    forward_mps = result['forward'] / 100.0 * ctrl.max_forward_mps
    yawrate = result['turn'] / 100.0 * ctrl.max_yawrate
    # Update heading
    new_heading = ctrl.heading + yawrate * dt
    # Update position
    vx = forward_mps * cos(new_heading)
    vy = forward_mps * sin(new_heading)
    new_x = x + vx * dt
    new_y = y + vy * dt
    ctrl.update_gps(pos_xy=(new_x, new_y), heading=new_heading)
python/debug_start_check.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,52 @@
import csv, json, math
with open('example_path.json') as f:
    path = json.load(f)
start = tuple(path[0])
rows = []
with open('controller_log.csv') as f:
    reader = csv.DictReader(f)
    for r in reader:
        rows.append(r)
first_follow_idx = None
for i, r in enumerate(rows):
    if r.get('stage','') == 'follow_path':
        first_follow_idx = i
        break
if first_follow_idx is not None:
    print('Found follow_path at row', first_follow_idx, 't=', rows[first_follow_idx]['t'])
    for j in range(max(0, first_follow_idx-5), first_follow_idx+5):
        r = rows[j]
        x = float(r['x']); y = float(r['y']); heading = float(r['heading'])
        dx = start[0]-x; dy = start[1]-y
        desired = math.atan2(dy, dx)
        h_err = (desired - heading + math.pi) % (2*math.pi) - math.pi
        print(j, r['t'], r['stage'], r['status'], f"pos=({x:.3f},{y:.3f})", f"dist={(math.hypot(dx,dy)):.3f}", f"h_err={h_err:.3f}")
else:
    print('No follow_path in log. Scanning for closest approach to start...')
    min_dist = float('inf')
    min_idx = None
    for i, r in enumerate(rows):
        x = float(r['x']); y = float(r['y']);
        d = math.hypot(start[0]-x, start[1]-y)
        if d < min_dist:
            min_dist = d; min_idx = i
    print('min dist to start =', min_dist, 'at row', min_idx, 't=', rows[min_idx]['t'])
    print('Nearby log lines:')
    for j in range(max(0, min_idx-10), min_idx+11):
        r = rows[j]
        x = float(r['x']); y = float(r['y']); heading = float(r['heading'])
        dx = start[0]-x; dy = start[1]-y
        desired = math.atan2(dy, dx)
        h_err = (desired - heading + math.pi) % (2*math.pi) - math.pi
        print(j, r['t'], r['stage'], r['status'], f"pos=({x:.3f},{y:.3f})", f"dist={(math.hypot(dx,dy)):.3f}", f"h_err={h_err:.3f}")
# also print counts of key statuses
from collections import Counter
cnt = Counter((r['stage'], r['status']) for r in rows)
print('\nStage/Status counts (top 20):')
for k,v in cnt.most_common(20):
    print(k, v)
python/mower_controller.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,1013 @@
"""
MowerController
控制器假设与契约:
- åæ ‡ç³»ï¼šXY å¹³é¢å•位为米(RTK æä¾›ï¼‰ï¼ŒZ å¿½ç•¥ç”¨äºŽå¹³é¢è·Ÿè¸ªã€‚
- èˆªå‘(heading):弧度,范围任意,控制器会处理环绕。
- GPS é¢‘率(位置/速度/heading/attitude)约 10Hz;IMU(加速度、角速度)100Hz。
- æŽ§åˆ¶è¾“出频率:74Hz,返回两个信号:forward(-100..100,对应 -1..+1 m/s)和 turn(-100..100,对应 -pi/2..+pi/2 rad/s,正为左转)。
- è·¯å¾„文件:JSON åˆ—表 [[x,y],[x,y],...]
设计说明:
- ä½¿ç”¨çº¯è¿½è¸ªï¼ˆpure pursuit)方法选取前视点,并基于角度误差使用 P æŽ§åˆ¶è®¡ç®—期望偏航速率(rad/s)。
- å‰è¿›é€Ÿåº¦ä¸Žåèˆªè¯¯å·®è€¦åˆï¼šåèˆªè¯¯å·®å¤§æ—¶é™ä½Žå‰è¿›é€Ÿåº¦ä»¥ä¿è¯å¯è½¬å¼¯èƒ½åŠ›ã€‚
接口:
- controller = MowerController(path_points, params)
- controller.update_gps(pos_xy, heading, vel_xy, timestamp)
- controller.update_imu(accel3, gyro3, timestamp)
- controller.compute_control(timestamp) -> {'forward': int, 'turn': int, 'info': {...}}
"""
from math import atan2, hypot, sin, cos, pi, acos
import time
def wrap_angle(a):
    # wrap to [-pi, pi)
    while a >= pi:
        a -= 2 * pi
    while a < -pi:
        a += 2 * pi
    return a
class MowerController:
    def __init__(self, path_points, params=None):
        # path_points: list of [x,y]
        # åŽŸå§‹è·¯å¾„ç‚¹
        original_path = [tuple(p) for p in path_points]
        if len(original_path) == 0:
            raise ValueError("path_points cannot be empty")
        # ä¿å­˜åŽŸå§‹è·¯å¾„ç”¨äºŽæ‹ç‚¹æ£€æµ‹
        self.original_path = original_path
        # è·¯å¾„加密(densify):避免首段过长导致纯追踪前视点跳到远处
        p_params = params or {}
        max_seg_len = p_params.get('max_segment_length', 0.6)  # é»˜è®¤å°†é•¿æ®µåˆ‡åˆ†ä¸º <=0.6m
        densified = []
        for i in range(len(original_path) - 1):
            x1, y1 = original_path[i]
            x2, y2 = original_path[i + 1]
            densified.append((x1, y1))
            dx = x2 - x1
            dy = y2 - y1
            seg_len = hypot(dx, dy)
            if seg_len > max_seg_len and seg_len > 1e-6:
                # éœ€è¦æ’入若干中间点(不包含终点,终点下一循环添加)
                steps = int(seg_len // max_seg_len)
                # ä½¿ç”¨å‡åŒ€çº¿æ€§æ’值(1..steps),防止首段过长纯追踪直接跳过
                for k in range(1, steps + 1):
                    ratio = k / (steps + 1)
                    densified.append((x1 + dx * ratio, y1 + dy * ratio))
        # æ·»åŠ æœ€åŽä¸€ä¸ªç‚¹
        densified.append(original_path[-1])
        self.path = densified
        self.n = len(self.path)
        # é¢„计算每段长度及累积长度用于进度计算
        self.segment_lengths = []
        self.cum_lengths = [0.0]
        for i in range(self.n - 1):
            ax, ay = self.path[i]
            bx, by = self.path[i+1]
            L = hypot(bx-ax, by-ay)
            self.segment_lengths.append(L)
            self.cum_lengths.append(self.cum_lengths[-1] + L)
        self.total_length = self.cum_lengths[-1] if self.cum_lengths else 0.0
        # è¿›åº¦å•调性保护变量
        self.last_progress = 0.0
        # parameters and gains
        p = params or {}
        self.max_forward_mps = p.get('max_forward_mps', 0.6)    # æ›´ä¿å®ˆçš„速度
        self.max_reverse_mps = p.get('max_reverse_mps', 0.2)    # ä¿å®ˆçš„倒车速度
        self.max_yawrate = p.get('max_yawrate', pi/4)          # æ›´ä¿å®ˆçš„转向速率
        # è½¯å¯åŠ¨å’ŒåŠ å‡é€Ÿ
        self.accel = p.get('accel', 0.2)                       # m/s^2
        self.current_speed_target = 0.0                         # ç›®æ ‡é€Ÿåº¦
        self.current_speed = 0.0                               # å®žé™…速度
        # æŽ§åˆ¶å¢žç›Šï¼ˆæé«˜å“åº”速度和精度)
        self.k_heading = p.get('k_heading', 1.8)               # å¤§å¹…提高转向增益,加快航向收敛
        self.k_heading_d = p.get('k_heading_d', 0.25)          # å¢žåŠ å¾®åˆ†ä½œç”¨ï¼ŒæŠ‘åˆ¶æŒ¯è¡
        self.k_xte = p.get('k_xte', 1.2)                       # å¤§å¹…提高横向误差增益,激进纠正偏离
        self.k_curv = p.get('k_curv', 0.2)                     # é™ä½Žæ›²çŽ‡å‰é¦ˆå¢žç›Š
        self.base_speed = p.get('base_speed', 0.5)             # æé«˜åŸºç¡€é€Ÿåº¦
        # è·¯å¾„跟踪参数(减小lookahead以提高精度)
        self.lookahead_time = p.get('lookahead_time', 2.0)    # å‡å°å‰è§†æ—¶é—´
        # å‡å°æœ€å°å‰è§†è·ç¦»ä»¥æé«˜è½¨è¿¹ç²¾åº¦
        self.lookahead_min = p.get('lookahead_min', 0.35)
        self.lookahead_max = p.get('lookahead_max', 1.5)      # å‡å°æœ€å¤§å‰è§†è·ç¦»
        self.goal_tolerance = p.get('goal_tolerance', 0.3)     # m
        self.slow_zone = p.get('slow_zone', 1.5)              # å‡å°å‡é€ŸåŒºåŸŸ
        self.final_approach_dist = p.get('final_approach_dist', 0.8)  # å‡å°ç²¾ç¡®æŽ¥è¿‘区域
        # æ–°å¢žï¼šæ¨ªå‘误差相关
        self.max_xte = p.get('max_xte', 1.0)                  # å‡å°æœ€å¤§æ¨ªå‘误差限制,提高精度
        self.xte_gain_scale = p.get('xte_gain_scale', 1.0)    # ç§»é™¤æ¨ªå‘误差缩放,使用完整增益
        # é€Ÿåº¦æŽ§åˆ¶ç¼©æ”¾å‚æ•°
        self.curv_gain_scale = p.get('curv_gain_scale', 2.0)  # æ›²çŽ‡ç¼©æ”¾å¢žç›Š
        self.heading_err_gain_scale = p.get('heading_err_gain_scale', 1.5)  # èˆªå‘误差缩放增益
        # çŠ¶æ€ä¼°è®¡å’Œæ»¤æ³¢
        self.yawrate_filter = 0.0
        self.yawrate_alpha = p.get('yawrate_alpha', 0.90)     # è¿›ä¸€æ­¥å¢žåŠ å¹³æ»‘åº¦
        self.last_heading_err = 0.0                           # ç”¨äºŽD控制
        self.last_time = None                                 # ç”¨äºŽè®¡ç®—dt
        self.last_curv = 0.0                                  # ç”¨äºŽå¹³æ»‘曲率变化
        # ç´¯ç§¯è¯¯å·®æŽ§åˆ¶
        self.heading_err_sum = 0.0                            # èˆªå‘误差积分
        self.xte_sum = 0.0                                    # æ¨ªå‘误差积分
        self.max_sum = 2.0                                    # ç§¯åˆ†é™å¹…
        self.k_heading_i = p.get('k_heading_i', 0.01)        # èˆªå‘误差积分增益
        self.k_xte_i = p.get('k_xte_i', 0.01)               # æ¨ªå‘误差积分增益
        # é”™è¯¯æ¢å¤
        self.consecutive_large_err = 0                        # è¿žç»­å¤§è¯¯å·®è®¡æ•°
        self.large_err_threshold = p.get('large_err_threshold', 0.5)  # å¤§è¯¯å·®é˜ˆå€¼
        self.max_large_err_count = 10                         # æœ€å¤§è¿žç»­å¤§è¯¯å·®æ¬¡æ•°
    # (日志文件将在下面统一创建)
        # state (most recent readings)
        self.pos = None  # (x,y)
        self.heading = None  # rad
        self.vel = (0.0, 0.0)  # vx, vy in m/s (ground frame)
        self.imu_accel = None
        self.imu_gyro = None
        # control stages
        self.STAGE_GOTO_START = 'goto_start'  # ç§»åŠ¨åˆ°èµ·ç‚¹é˜¶æ®µ
        self.STAGE_FOLLOW_PATH = 'follow_path' # è·Ÿéšè·¯å¾„阶段
        self.stage = self.STAGE_GOTO_START
        # èµ·ç‚¹æŽ¥è¿‘参数 - æ”¶ç´§ä»¥ç¡®ä¿çœŸæ­£åˆ°è¾¾èµ·ç‚¹
        self.start_tolerance = p.get('start_tolerance', 0.12)   # ä½ç½®å®¹å·®ï¼ˆæ”¶ç´§åˆ°12cm)
        self.start_heading_tolerance = p.get('start_heading_tolerance', 0.01)  # èˆªå‘容差 (~14°,收紧)
        self.start_approach_dist = p.get('start_approach_dist', 1.5)  # å‡é€Ÿè·ç¦»
        self.start_slow_speed = p.get('start_slow_speed', 0.25)  # æŽ¥è¿‘速度
        self.start_min_speed = p.get('start_min_speed', 0.25)   # æé«˜æœ€å°é€Ÿåº¦
        # æ–°å¢žï¼šåˆ°èµ·ç‚¹æ—¶å…ˆè½¬å‘再前进的阈值(若与起点方向误差大于此阈值,则先转向)
        self.start_turn_first_heading = p.get('start_turn_first_heading', pi/50)  # ~15度
        # è·¯å¾„跟随阶段的最小前进速度(防止映射/缩放导致速度归零)
        self.min_follow_speed = p.get('min_follow_speed', 0.08)
        # Recovery æ¨¡å¼å‚æ•°
        self.in_recovery = False
        self.recovery_dist = p.get('recovery_dist', 2.0)
        self.recovery_exit_dist = p.get('recovery_exit_dist', 0.8)
        self.recovery_speed = p.get('recovery_speed', 0.55)
        # progress - åªä½¿ç”¨ç‚¹ç´¢å¼•,不使用segment
        self.current_target_idx = 0
        self.nearest_path_idx = 0  # æœ€è¿‘路径点索引
        self.finished = False
        # goto_start å­çŠ¶æ€ï¼š'rotate'|'move'|'align',保证顺序执行且不反复切换
        self.goto_substage = 'rotate'
        # æ‹ç‚¹è½¬å‘参数
        self.waypoint_approach_mode = False  # æ˜¯å¦åœ¨æŽ¥è¿‘拐点模式
        self.waypoint_turn_mode = False  # æ˜¯å¦åœ¨æ‹ç‚¹è½¬å‘模式
        self.waypoint_turn_target_idx = None  # å½“前转向的拐点在原始路径中的索引
        self.waypoint_turn_tolerance = p.get('waypoint_turn_tolerance', 0.12)  # åˆ°è¾¾æ‹ç‚¹çš„距离容差(12cm)
        self.waypoint_approach_dist = p.get('waypoint_approach_dist', 0.8)  # å¼€å§‹å‡é€ŸæŽ¥è¿‘拐点的距离(80cm)
        self.waypoint_turn_heading_tol = p.get('waypoint_turn_heading_tol', 0.1)  # è½¬å‘完成的航向容差(~6度)
        self.waypoint_turned = set()  # è®°å½•已经转向过的拐点,避免重复转向
        # åˆ›å»ºæ—¥å¿—文件
        self.log_file = open('controller_log.csv', 'w')
        # ç®€åŒ–日志格式:移除segment相关
        self.log_file.write('t,x,y,heading,target_x,target_y,target_dist,target_idx,xte,heading_err,forward,turn,speed,stage,status\n')
    def update_gps(self, pos_xy, heading, vel_xy=None, timestamp=None):
        """Update GPS-like measurements at ~10Hz.
        pos_xy: (x,y) in meters
        heading: radians
        vel_xy: (vx, vy) in m/s in global frame (optional)
        """
        self.pos = tuple(pos_xy)
        self.heading = float(heading)
        if vel_xy is not None:
            self.vel = tuple(vel_xy)
    def update_imu(self, accel3, gyro3, timestamp=None):
        """Update IMU (100Hz). accel3 and gyro3 are 3-element tuples."""
        self.imu_accel = tuple(accel3)
        self.imu_gyro = tuple(gyro3)
    def _calc_xte(self, p1, p2, pos):
        """Calculate cross track error (perpendicular distance to line segment)"""
        x, y = pos
        x1, y1 = p1
        x2, y2 = p2
        # Convert to local coordinates
        dx = x2 - x1
        dy = y2 - y1
        l2 = dx*dx + dy*dy
        if l2 == 0:  # points are identical
            return hypot(x - x1, y - y1)
        # Project point onto line
        t = ((x - x1)*dx + (y - y1)*dy) / l2
        if t < 0:  # before start
            return hypot(x - x1, y - y1)
        elif t > 1:  # after end
            return hypot(x - x2, y - y2)
        else:  # projection point lies on segment
            px = x1 + t*dx
            py = y1 + t*dy
            xte = hypot(x - px, y - py)
            # Determine sign based on which side of the line the point is on
            sign = 1 if (x - px)*dy - (y - py)*dx > 0 else -1
            return sign * xte
    def _is_waypoint_turn_needed(self, idx):
        """
        æ£€æµ‹è·¯å¾„点idx是否是拐点(需要原地转向)
        æ‹ç‚¹å®šä¹‰ï¼šå‰åŽä¸¤æ®µè·¯å¾„方向变化超过45度
        """
        if idx <= 0 or idx >= len(self.path) - 1:
            return False
        # å‰ä¸€æ®µæ–¹å‘
        p_prev = self.path[idx - 1]
        p_curr = self.path[idx]
        p_next = self.path[idx + 1]
        dx1 = p_curr[0] - p_prev[0]
        dy1 = p_curr[1] - p_prev[1]
        dx2 = p_next[0] - p_curr[0]
        dy2 = p_next[1] - p_curr[1]
        # è®¡ç®—两段方向
        heading1 = atan2(dy1, dx1)
        heading2 = atan2(dy2, dx2)
        # æ–¹å‘变化角度
        angle_diff = abs(wrap_angle(heading2 - heading1))
        # å¦‚果方向变化超过45度,认为是拐点
        return angle_diff > pi / 4
    def _find_nearest_original_waypoint(self):
        """
        åœ¨åŽŸå§‹è·¯å¾„ä¸­æ‰¾åˆ°æœ€æŽ¥è¿‘å½“å‰ä½ç½®çš„æ‹ç‚¹
        Returns: (waypoint_idx, distance) æˆ– (None, None)
        """
        if self.pos is None:
            return None, None
        x, y = self.pos
        nearest_waypoint_idx = None
        nearest_dist = float('inf')
        # éåŽ†åŽŸå§‹è·¯å¾„çš„æ‰€æœ‰æ‹ç‚¹
        for i in range(1, len(self.original_path) - 1):
            # æ£€æŸ¥æ˜¯å¦æ˜¯æ‹ç‚¹
            p_prev = self.original_path[i - 1]
            p_curr = self.original_path[i]
            p_next = self.original_path[i + 1]
            dx1 = p_curr[0] - p_prev[0]
            dy1 = p_curr[1] - p_prev[1]
            dx2 = p_next[0] - p_curr[0]
            dy2 = p_next[1] - p_curr[1]
            heading1 = atan2(dy1, dx1)
            heading2 = atan2(dy2, dx2)
            angle_diff = abs(wrap_angle(heading2 - heading1))
            # å¦‚果是拐点(方向变化>45度)
            if angle_diff > pi / 4:
                dist = hypot(p_curr[0] - x, p_curr[1] - y)
                if dist < nearest_dist:
                    nearest_dist = dist
                    nearest_waypoint_idx = i
        return nearest_waypoint_idx, nearest_dist
    def _get_target_heading_at_waypoint(self, idx):
        """获取原始路径点idx处的目标航向(下一段的方向)"""
        if idx >= len(self.original_path) - 1:
            # æœ€åŽä¸€ä¸ªç‚¹ï¼Œè¿”回前一段方向
            if idx > 0:
                dx = self.original_path[idx][0] - self.original_path[idx-1][0]
                dy = self.original_path[idx][1] - self.original_path[idx-1][1]
                return atan2(dy, dx)
            return 0.0
        # è¿”回到下一个点的方向
        dx = self.original_path[idx + 1][0] - self.original_path[idx][0]
        dy = self.original_path[idx + 1][1] - self.original_path[idx][1]
        return atan2(dy, dx)
    def _find_lookahead_point(self, lookahead_dist):
        """
        ç®€åŒ–çš„Pure Pursuit前视点查找 - åªä½¿ç”¨ç‚¹ç´¢å¼•
        ä»Žnearest_path_idx开始向前搜索,找到第一个距离>=lookahead_dist的点
        Returns: (lookahead_xy, curvature, lookahead_idx, nearest_idx)
        """
        if self.pos is None:
            return self.path[0], 0.0, 0, 0
        x, y = self.pos
        # 1. æ‰¾æœ€è¿‘点 - å¦‚果离路径很远,使用全局搜索;否则使用窗口搜索
        # å…ˆæ£€æŸ¥å½“前nearest_path_idx的距离
        px, py = self.path[self.nearest_path_idx]
        current_nearest_dist = hypot(px - x, py - y)
        # å¦‚果距离>1.0m,使用全局搜索
        if current_nearest_dist > 1.0:
            search_start = 0
            search_end = len(self.path)
        else:
            # ä½¿ç”¨çª—口搜索
            search_start = max(0, self.nearest_path_idx - 5)
            search_end = min(len(self.path), self.nearest_path_idx + 30)
        nearest_idx = self.nearest_path_idx
        nearest_dist = float('inf')
        for i in range(search_start, search_end):
            px, py = self.path[i]
            d = hypot(px - x, py - y)
            if d < nearest_dist:
                nearest_dist = d
                nearest_idx = i
        self.nearest_path_idx = nearest_idx  # æ›´æ–°æœ€è¿‘点索引
        # 2. ä»Žnearest_idx开始向前搜索lookahead点
        lookahead_idx = nearest_idx
        for i in range(nearest_idx, len(self.path)):
            px, py = self.path[i]
            d = hypot(px - x, py - y)
            if d >= lookahead_dist:
                lookahead_idx = i
                break
        else:
            # æ²¡æ‰¾åˆ°ï¼Œä½¿ç”¨æœ€åŽä¸€ä¸ªç‚¹
            lookahead_idx = len(self.path) - 1
        # 3. ä¼°è®¡æ›²çŽ‡ï¼ˆåŸºäºŽnearest_idx附近的点)
        curvature = 0.0
        if 1 <= nearest_idx < len(self.path) - 1:
            p1 = self.path[nearest_idx - 1]
            p2 = self.path[nearest_idx]
            p3 = self.path[nearest_idx + 1]
            v1 = (p2[0] - p1[0], p2[1] - p1[1])
            v2 = (p3[0] - p2[0], p3[1] - p2[1])
            l1 = hypot(v1[0], v1[1])
            l2 = hypot(v2[0], v2[1])
            if l1 > 1e-6 and l2 > 1e-6:
                dot = v1[0]*v2[0] + v1[1]*v2[1]
                cos_angle = max(-1.0, min(1.0, dot / (l1 * l2)))
                angle_change = abs(pi - acos(cos_angle))
                curvature = angle_change / max(l1, 1e-6)
        return self.path[lookahead_idx], curvature, lookahead_idx, nearest_idx
    def _find_nearest_original_waypoint(self):
        """
        åœ¨åŽŸå§‹è·¯å¾„ä¸­æŸ¥æ‰¾ä¸‹ä¸€ä¸ªæœªåˆ°è¾¾çš„æ‹ç‚¹
        æŒ‰ç…§è·¯å¾„顺序查找,不跳过
        Returns: (waypoint_idx, distance) æˆ– (None, inf) å¦‚果没有找到
        """
        if not hasattr(self, 'original_path') or len(self.original_path) < 3:
            return None, float('inf')
        x, y = self.pos
        # éåŽ†åŽŸå§‹è·¯å¾„ä¸­çš„æ‹ç‚¹ï¼ˆè·³è¿‡èµ·ç‚¹å’Œç»ˆç‚¹ï¼‰
        for i in range(1, len(self.original_path) - 1):
            # å¦‚果这个拐点已经转向过,跳过
            if i in self.waypoint_turned:
                continue
            # æ£€æŸ¥æ–¹å‘是否改变
            prev_pt = self.original_path[i-1]
            curr_pt = self.original_path[i]
            next_pt = self.original_path[i+1]
            # è®¡ç®—前后方向向量
            dx1 = curr_pt[0] - prev_pt[0]
            dy1 = curr_pt[1] - prev_pt[1]
            dx2 = next_pt[0] - curr_pt[0]
            dy2 = next_pt[1] - curr_pt[1]
            # è®¡ç®—角度
            angle1 = atan2(dy1, dx1)
            angle2 = atan2(dy2, dx2)
            angle_change = abs(wrap_angle(angle2 - angle1))
            # å¦‚果角度变化大于45度,认为是拐点
            if angle_change > pi / 4:
                # æ‰¾åˆ°ç¬¬ä¸€ä¸ªæœªå®Œæˆçš„æ‹ç‚¹ï¼Œç›´æŽ¥è¿”回
                dist = hypot(curr_pt[0] - x, curr_pt[1] - y)
                return i, dist
        return None, float('inf')
    def _get_target_heading_at_waypoint(self, waypoint_idx):
        """
        èŽ·å–æ‹ç‚¹å¤„çš„ç›®æ ‡èˆªå‘ï¼ˆä¸‹ä¸€æ®µè·¯å¾„çš„æ–¹å‘ï¼‰
        Args:
            waypoint_idx: æ‹ç‚¹åœ¨åŽŸå§‹è·¯å¾„ä¸­çš„ç´¢å¼•
        Returns: ç›®æ ‡èˆªå‘(弧度)
        """
        if waypoint_idx is None:
            return 0.0  # é»˜è®¤å‘东
        # å¦‚果是最后一个点,使用前一段的方向(保持不变)
        if waypoint_idx >= len(self.original_path) - 1:
            if waypoint_idx > 0:
                prev_pt = self.original_path[waypoint_idx - 1]
                curr_pt = self.original_path[waypoint_idx]
                return atan2(curr_pt[1] - prev_pt[1], curr_pt[0] - prev_pt[0])
            else:
                return 0.0  # åªæœ‰ä¸€ä¸ªç‚¹ï¼Œé»˜è®¤å‘东
        curr_pt = self.original_path[waypoint_idx]
        next_pt = self.original_path[waypoint_idx + 1]
        dx = next_pt[0] - curr_pt[0]
        dy = next_pt[1] - curr_pt[1]
        return atan2(dy, dx)
    def _compute_xte_simple(self, pos):
        """
        ç®€åŒ–的横向误差计算 - åŸºäºŽæœ€è¿‘点
        Returns: (xte, proj_x, proj_y)
        """
        x, y = pos
        nearest_idx = self.nearest_path_idx
        # è®¡ç®—到最近点的向量
        px, py = self.path[nearest_idx]
        # å¦‚果有下一个点,计算到路径段的横向误差
        if nearest_idx < len(self.path) - 1:
            nx, ny = self.path[nearest_idx + 1]
            # æ®µæ–¹å‘向量
            dx = nx - px
            dy = ny - py
            seg_len2 = dx*dx + dy*dy
            if seg_len2 > 1e-9:
                # æŠ•影到段上
                t = ((x - px)*dx + (y - py)*dy) / seg_len2
                t = max(0.0, min(1.0, t))  # é™åˆ¶åœ¨[0,1]
                proj_x = px + t * dx
                proj_y = py + t * dy
                # æ¨ªå‘误差(带符号)
                xte_vec_x = x - proj_x
                xte_vec_y = y - proj_y
                xte_mag = hypot(xte_vec_x, xte_vec_y)
                # ä½¿ç”¨å‰ä¹˜åˆ¤æ–­å·¦å³
                cross = xte_vec_x * dy - xte_vec_y * dx
                xte = xte_mag if cross > 0 else -xte_mag
                return xte, proj_x, proj_y
        # å¦åˆ™ç›´æŽ¥ç”¨åˆ°æœ€è¿‘点的距离
        xte = hypot(x - px, y - py)
        return xte, px, py
    def compute_control(self, timestamp=None):
        """Compute forward and turn outputs mapped to [-100,100].
        Returns dict with keys: forward(int), turn(int), info(dict)
        """
        if self.finished:
            return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
        if self.pos is None or self.heading is None:
            return {'forward': 0, 'turn': 0, 'info': {'status': 'no_state'}}
        # ensure timestamp is numeric for logging/formatting
        if timestamp is None:
            timestamp = time.time()
        # è®¡ç®—时间增量
        if self.last_time is None:
            dt = 0.014  # çº¦74Hz
        else:
            dt = timestamp - self.last_time if timestamp else 0.014
        self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0)
        x, y = self.pos
        # å¤„理移动到起点阶段:使用持久子状态确保顺序执行:rotate -> move -> align
        if self.stage == self.STAGE_GOTO_START:
            start_point = self.path[0]
            dx = start_point[0] - x
            dy = start_point[1] - y
            dist_to_start = hypot(dx, dy)
            # ç›®æ ‡æœå‘指向起点
            desired_heading_to_start = atan2(dy, dx)
            heading_err_to_start = wrap_angle(desired_heading_to_start - self.heading)
            # åˆå§‹åŒ–子状态(保守)
            if not hasattr(self, 'goto_substage') or self.goto_substage is None:
                self.goto_substage = 'rotate'
            # å­çŠ¶æ€ï¼šrotate -> move -> align
            if self.goto_substage == 'rotate':
                # åŽŸåœ°è½¬å‘ç›´åˆ°æœå‘è¯¯å·®è¶³å¤Ÿå°ï¼ˆç§»é™¤è·ç¦»é€€å‡ºæ¡ä»¶ï¼Œç¡®ä¿å…ˆè½¬å¥½ï¼‰
                if abs(heading_err_to_start) <= self.start_turn_first_heading:
                    self.goto_substage = 'move'
                else:
                    yawrate_cmd = self.k_heading * heading_err_to_start * 1.3  # ç¨å¾®æé«˜è½¬å‘速度
                    turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
                    turn_signal = max(-100, min(100, turn_signal))
                    forward_signal = 0
                    # target is start_point for goto_start substage
                    target_x, target_y = start_point
                    target_dist = hypot(target_x - x, target_y - y)
                    target_idx = 0
                    self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                        f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
                                        f'0.000,{heading_err_to_start:.3f},{forward_signal},{turn_signal},0.000,{self.stage},rotate_to_start\n')
                    self.log_file.flush()
                    return {'forward': forward_signal, 'turn': turn_signal, 'info': {
                        'status': 'rotate_to_start', 'stage': self.stage, 'heading_err': heading_err_to_start,
                        'dist_to_start': dist_to_start
                    }}
            if self.goto_substage == 'move':
                # åœ¨ç§»åŠ¨å­çŠ¶æ€æœŸé—´ï¼Œä¸ä¼šå›žåˆ° rotate å­çŠ¶æ€ï¼Œç›´åˆ°æ˜¾å¼åˆ‡æ¢
                desired_heading = desired_heading_to_start
                heading_err = wrap_angle(desired_heading - self.heading)
                # å¦‚果已经接近起点则进入 align å­çŠ¶æ€
                if dist_to_start < self.start_tolerance:
                    self.goto_substage = 'align'
                else:
                    # å¸¸è§„:面向起点并前进
                    if dist_to_start < self.start_approach_dist:
                        dist_ratio = dist_to_start / self.start_approach_dist
                        speed_scale = 0.4 + 0.6 * dist_ratio  # æé«˜æœ€å°é€Ÿåº¦æ¯”例
                        base_target_speed = self.start_slow_speed + (self.base_speed - self.start_slow_speed) * dist_ratio
                    else:
                        speed_scale = 1.0
                        base_target_speed = min(self.base_speed * 0.8, 0.5)  # é™åˆ¶æœ€å¤§é€Ÿåº¦é¿å…è¿‡å†²
                    # èˆªå‘误差缩放更宽松
                    heading_ratio = abs(heading_err) / pi
                    err_scale = 0.5 + 0.5 * (1.0 - heading_ratio)  # æ›´å®½æ¾
                    target_speed = max(self.start_min_speed * 1.5, base_target_speed * speed_scale * err_scale)
                    target_speed = max(target_speed, self.start_min_speed * 1.2)  # ç¡®ä¿æœ€å°é€Ÿåº¦
                    yawrate_cmd = self.k_heading * heading_err
                    forward_signal = int(round((target_speed / self.max_forward_mps) * 100))
                    forward_signal = max(-100, min(100, forward_signal))
                    turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
                    turn_signal = max(-100, min(100, turn_signal))
                    target_x, target_y = start_point
                    target_dist = hypot(target_x - x, target_y - y)
                    target_idx = 0
                    self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                      f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
                                      f'0.000,{heading_err:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},moving_to_start\n')
                    self.log_file.flush()
                    return {'forward': forward_signal, 'turn': turn_signal, 'info': {
                        'status': 'moving_to_start', 'stage': self.stage, 'dist_to_start': dist_to_start,
                        'heading_err': heading_err
                    }}
            # align å­çŠ¶æ€ï¼šåˆ°è¾¾èµ·ç‚¹åŽè¿›è¡ŒåŽŸåœ°æœå‘ä¿®æ­£ï¼Œå®ŒæˆåŽåˆ‡æ¢åˆ°è·Ÿè¸ª
            if self.goto_substage == 'align':
                if len(self.path) > 1:
                    path_dx = self.path[1][0] - self.path[0][0]
                    path_dy = self.path[1][1] - self.path[0][1]
                    desired_start_heading = atan2(path_dy, path_dx)
                else:
                    desired_start_heading = 0.0
                heading_err_final = wrap_angle(desired_start_heading - self.heading)
                # å¦‚果最终朝向对齐或非常接近位置,则切换到跟踪
                # åªæœ‰åŒæ—¶æ»¡è¶³ä½ç½®ä¸Žèˆªå‘条件才算真正到达起点,避免“偏近+朝向差”提前切换
                if (dist_to_start < self.start_tolerance) and (abs(heading_err_final) < self.start_heading_tolerance):
                    self.stage = self.STAGE_FOLLOW_PATH
                    self.goto_substage = None
                    # ä¿æŒ current_target_idx = 0,让 pure pursuit è‡ªç„¶é€‰æ‹©å‰è§†ç‚¹
                    self.current_target_idx = 0
                    # é‡ç½®é€Ÿåº¦ä¸ºè¾ƒå°å€¼ï¼Œé¿å… lookahead è¿‡å¤§
                    self.current_speed = self.start_min_speed
                    # å°å¹…前进以确保不卡在起点
                    f_sig = int(round((self.start_min_speed / self.max_forward_mps) * 100))
                    target_x, target_y = self.path[0]  # ä½¿ç”¨èµ·ç‚¹ä½œä¸ºåˆå§‹ç›®æ ‡
                    target_dist = hypot(target_x - x, target_y - y)
                    target_idx = 0
                    self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                        f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
                                        f'0.000,{heading_err_final:.3f},{f_sig},0,{self.start_min_speed:.3f},{self.stage},reached_start\n')
                    self.log_file.flush()
                    return {'forward': f_sig, 'turn': 0, 'info': {'status': 'reached_start', 'stage': self.stage, 'advanced_idx': self.current_target_idx}}
                else:
                    # åœ¨èµ·ç‚¹é™„近但朝向不对,进行原地修正(停止前进)
                    sign = 1 if heading_err_final >= 0 else -1
                    yawrate_cmd = sign * min(self.max_yawrate * 0.6, max(0.05, abs(heading_err_final) * 0.8))
                    turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
                    turn_signal = max(-100, min(100, turn_signal))
                    target_x, target_y = start_point
                    target_dist = hypot(target_x - x, target_y - y)
                    target_idx = 0
                    self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                        f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},'
                                        f'0.000,{heading_err_final:.3f},0,{turn_signal},{0.000:.3f},{self.stage},align_at_start\n')
                    self.log_file.flush()
                    return {'forward': 0, 'turn': turn_signal, 'info': {'status': 'align_at_start', 'heading_err': heading_err_final}}
        # è®¡ç®—时间增量
        if self.last_time is None:
            dt = 0.014  # çº¦74Hz
        else:
            dt = timestamp - self.last_time if timestamp else 0.014
        self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0)
        # è®¡ç®—横向误差(使用简化方法)
        x, y = self.pos
        xte, proj_x, proj_y = self._compute_xte_simple(self.pos)
        # é¢„先计算到终点的距离
        final_dist = hypot(self.path[-1][0] - x, self.path[-1][1] - y)
        # ========== æ‹ç‚¹æŽ¥è¿‘和转向模式检测 ==========
        # æ£€æŸ¥æ˜¯å¦éœ€è¦è¿›å…¥æ‹ç‚¹æŽ¥è¿‘或转向模式
        if not self.waypoint_approach_mode and not self.waypoint_turn_mode:
            # åœ¨åŽŸå§‹è·¯å¾„ä¸­æŸ¥æ‰¾æœ€è¿‘çš„æ‹ç‚¹
            waypoint_idx, waypoint_dist = self._find_nearest_original_waypoint()
            # å¦‚果拐点在接近距离内(2m)且未转向过,进入接近模式
            # æ³¨æ„ï¼šå¢žå¤§æ£€æµ‹è·ç¦»ï¼Œç¡®ä¿åœ¨çŸ­è¾¹ä¹Ÿèƒ½æå‰è¿›å…¥æŽ¥è¿‘模式
            if (waypoint_idx is not None and
                waypoint_dist < 2.0 and
                waypoint_idx not in self.waypoint_turned):
                self.waypoint_approach_mode = True
                self.waypoint_turn_target_idx = waypoint_idx  # è®°å½•当前接近的拐点
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{self.original_path[waypoint_idx][0]:.3f},'
                                    f'{self.original_path[waypoint_idx][1]:.3f},'
                                    f'{waypoint_dist:.3f},{waypoint_idx},'
                                    f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_waypoint_approach\n')
                self.log_file.flush()
        # ========== æ‹ç‚¹æŽ¥è¿‘模式:精确接近拐点位置 ==========
        if self.waypoint_approach_mode:
            waypoint_x, waypoint_y = self.original_path[self.waypoint_turn_target_idx]
            dx = waypoint_x - x
            dy = waypoint_y - y
            waypoint_dist = hypot(dx, dy)
            # æ£€æŸ¥æ˜¯å¦åˆ°è¾¾æ‹ç‚¹ï¼ˆ12cm容差)
            if waypoint_dist < self.waypoint_turn_tolerance:
                # åˆ°è¾¾æ‹ç‚¹ï¼Œåˆ‡æ¢åˆ°è½¬å‘模式
                self.waypoint_approach_mode = False
                self.waypoint_turn_mode = True
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{waypoint_x:.3f},{waypoint_y:.3f},'
                                    f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},'
                                    f'{xte:.3f},0.000,0,0,0.000,{self.stage},reached_waypoint\n')
                self.log_file.flush()
            else:
                # ç›´æŽ¥æœå‘拐点前进(忽略路径,直接目标点导航)
                desired_heading_to_waypoint = atan2(dy, dx)
                heading_err_to_waypoint = wrap_angle(desired_heading_to_waypoint - self.heading)
                # é€Ÿåº¦æŽ§åˆ¶ï¼šæŽ¥è¿‘时逐渐减速
                if waypoint_dist < 0.5:  # 50cm内减速
                    speed_ratio = waypoint_dist / 0.5
                    target_speed = self.start_min_speed * (0.4 + 0.6 * speed_ratio)
                else:
                    target_speed = self.base_speed * 0.7
                # èˆªå‘误差也会降低速度
                heading_ratio = abs(heading_err_to_waypoint) / pi
                err_scale = 0.5 + 0.5 * (1.0 - heading_ratio)
                target_speed = target_speed * err_scale
                target_speed = max(self.start_min_speed * 0.5, target_speed)
                # è½¬å‘控制
                yawrate_cmd = self.k_heading * heading_err_to_waypoint * 1.3  # ç¨å¾®æé«˜è½¬å‘增益
                forward_signal = int(round((target_speed / self.max_forward_mps) * 100))
                forward_signal = max(-100, min(100, forward_signal))
                turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
                turn_signal = max(-100, min(100, turn_signal))
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{waypoint_x:.3f},{waypoint_y:.3f},'
                                    f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},'
                                    f'{xte:.3f},{heading_err_to_waypoint:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},approaching_waypoint\n')
                self.log_file.flush()
                return {'forward': forward_signal, 'turn': turn_signal, 'info': {
                    'status': 'approaching_waypoint',
                    'waypoint_dist': waypoint_dist,
                    'heading_err': heading_err_to_waypoint,
                    'target_speed': target_speed
                }}
        # ========== æ‹ç‚¹è½¬å‘模式:原地转向 ==========
        if self.waypoint_turn_mode:
            # èŽ·å–ç›®æ ‡èˆªå‘ï¼ˆä¸‹ä¸€æ®µçš„æ–¹å‘ï¼‰
            target_heading = self._get_target_heading_at_waypoint(self.waypoint_turn_target_idx)
            heading_err = wrap_angle(target_heading - self.heading)
            # æ£€æŸ¥æ˜¯å¦è½¬å‘完成
            if abs(heading_err) < self.waypoint_turn_heading_tol:
                # è½¬å‘完成,退出转向模式
                self.waypoint_turned.add(self.waypoint_turn_target_idx)  # è®°å½•已转向
                self.waypoint_turn_mode = False
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},'
                                    f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},'
                                    f'0.000,{self.waypoint_turn_target_idx},'
                                    f'{xte:.3f},{heading_err:.3f},0,0,0.000,{self.stage},exit_waypoint_turn\n')
                self.log_file.flush()
            else:
                # åŽŸåœ°è½¬å‘ï¼šforward=0, turn基于航向误差
                yawrate_cmd = self.k_heading * heading_err * 1.2  # ç¨å¾®æé«˜è½¬å‘增益
                turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100))
                turn_signal = max(-100, min(100, turn_signal))
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},'
                                    f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},'
                                    f'0.000,{self.waypoint_turn_target_idx},'
                                    f'{xte:.3f},{heading_err:.3f},0,{turn_signal},0.000,{self.stage},waypoint_turning\n')
                self.log_file.flush()
                return {'forward': 0, 'turn': turn_signal, 'info': {
                    'status': 'waypoint_turning',
                    'heading_err': heading_err,
                    'target_heading': target_heading
                }}
        # åŠ¨æ€å‰è§†è·ç¦»ï¼šåŸºäºŽå½“å‰é€Ÿåº¦å’Œæ¨ªå‘è¯¯å·®
        actual_speed = abs(self.current_speed)
        xte_mag = abs(xte)
        # lookahead = lookahead_min + speed_component - xte_penalty
        base_lookahead = self.lookahead_min + self.lookahead_time * actual_speed - 0.3 * min(xte_mag, 0.5)
        lookahead = max(self.lookahead_min, min(self.lookahead_max, base_lookahead))
        # æ¢å¤æ¨¡å¼åˆ¤å®š
        dist_to_path = abs(xte)
        if not self.in_recovery and dist_to_path > self.recovery_dist:
            self.in_recovery = True
            self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},'
                                f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_recovery\n')
            self.log_file.flush()
        if self.in_recovery and dist_to_path < self.recovery_exit_dist:
            self.in_recovery = False
            self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},'
                                f'{xte:.3f},0.000,0,0,0.000,{self.stage},exit_recovery\n')
            self.log_file.flush()
        # ========== æ ‡å‡† Pure Pursuit æŽ§åˆ¶ ==========
        # é€‰æ‹©å‰è§†ç‚¹
        (lx, ly), path_curvature, lookahead_idx, nearest_idx = self._find_lookahead_point(lookahead)
        self.current_target_idx = lookahead_idx
        # è®¡ç®—前视点方向
        path_heading = atan2(ly - y, lx - x)
        # æ¨ªå‘误差修正
        xte_normalized = max(-1.0, min(1.0, xte / self.max_xte))
        xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale
        # æœŸæœ›èˆªå‘ = å‰è§†æ–¹å‘ + xte修正
        desired_heading = wrap_angle(path_heading + xte_correction)
        heading_err = wrap_angle(desired_heading - self.heading)
        # è®¡ç®—航向误差的变化率
        heading_err_d = (heading_err - self.last_heading_err) / dt if dt > 0 else 0
        self.last_heading_err = heading_err
        # ç§¯åˆ†é¡¹è®¡ç®—(带防饱和)
        self.heading_err_sum += heading_err * dt
        self.xte_sum += xte * dt
        self.heading_err_sum = max(-self.max_sum, min(self.max_sum, self.heading_err_sum))
        self.xte_sum = max(-self.max_sum, min(self.max_sum, self.xte_sum))
        # PID控制
        yawrate_cmd = (self.k_heading * heading_err +
                      self.k_heading_d * heading_err_d +
                      self.k_heading_i * self.heading_err_sum +
                      self.k_xte_i * self.xte_sum)
        # é€Ÿåº¦æŽ§åˆ¶ï¼šåŸºäºŽæ›²çŽ‡ã€èˆªå‘è¯¯å·®ã€æ¨ªå‘è¯¯å·®
        # 1) æ›²çŽ‡ç¼©æ”¾
        curv_scale = 1.0 / (1.0 + abs(path_curvature) * self.curv_gain_scale)
        # 2) èˆªå‘误差缩放
        heading_err_abs = abs(heading_err)
        err_scale = 1.0 / (1.0 + heading_err_abs * self.heading_err_gain_scale)
        # 3) æ¨ªå‘误差缩放
        xte_abs = abs(xte)
        xte_scale = 1.0 / (1.0 + xte_abs * self.xte_gain_scale)
        # æ··åˆç¼©æ”¾ï¼šå‡ ä½•平均 + æœ€å°å€¼
        geom_scale = (curv_scale * err_scale * xte_scale) ** (1.0/3.0)
        min_scale = min(curv_scale, err_scale, xte_scale)
        blend_scale = 0.8 * geom_scale + 0.2 * min_scale
        target_speed = self.base_speed * blend_scale
        # æ»¤æ³¢å’Œé™å¹…
        self.yawrate_filter = (self.yawrate_alpha * yawrate_cmd +
                              (1-self.yawrate_alpha) * self.yawrate_filter)
        max_yawrate = self.max_yawrate
        yawrate_cmd = max(-max_yawrate, min(max_yawrate, self.yawrate_filter))
        # æ¢å¤æ¨¡å¼è¦†ç›–(优先级更高)
        if self.in_recovery:
            # æ¢å¤æ¨¡å¼ï¼šæœå‘投影点
            path_heading = atan2(proj_y - y, proj_x - x)
            xte_normalized = max(-1.0, min(1.0, xte / self.max_xte))
            xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale
            desired_heading = wrap_angle(path_heading + xte_correction)
            heading_err = wrap_angle(desired_heading - self.heading)
            yawrate_cmd = self.k_heading * heading_err
            target_speed = self.recovery_speed
        # åŸºäºŽè·ç¦»ç»ˆç‚¹çš„距离调整速度
        in_slow_zone = final_dist < self.slow_zone
        in_final_approach = final_dist < self.final_approach_dist
        # é”™è¯¯æ¢å¤é€»è¾‘
        if abs(xte) > self.large_err_threshold * 1.5 or abs(heading_err) > pi/2.5:
            self.consecutive_large_err += 1
            if self.consecutive_large_err > self.max_large_err_count:
                # é‡ç½®åˆ°å®‰å…¨çŠ¶æ€
                self.consecutive_large_err = 0
                self.heading_err_sum = 0
                self.xte_sum = 0
        else:
            self.consecutive_large_err = 0
        # æŽ¥è¿‘终点时调整速度
        if not self.in_recovery:
            if in_slow_zone:
                dist_scale = max(0.3, min(1.0, final_dist / self.slow_zone))
                if in_final_approach:
                    dist_scale *= max(0.4, final_dist / self.final_approach_dist)
                target_speed *= dist_scale
            # åœ¨ç»ˆç‚¹é™„近增加精度
            if in_final_approach:
                target_speed *= 0.6
                if abs(xte) > 0.15 or abs(heading_err) > pi/10:
                    target_speed *= 0.7
        # å¼ºåˆ¶æœ€å°é€Ÿåº¦ï¼ˆé¿å…å®Œå…¨åœæ»žï¼‰
        if self.stage == self.STAGE_FOLLOW_PATH and not self.in_recovery:
            dynamic_min = self.min_follow_speed if in_slow_zone else self.min_follow_speed * 1.5
            if abs(heading_err) < 3.0 and target_speed < dynamic_min:
                target_x, target_y = lx, ly
                target_dist = hypot(target_x - x, target_y - y)
                self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
                                    f'{xte:.3f},{heading_err:.3f},0,0,'
                                    f'{dynamic_min:.3f},{self.stage},enforce_min_speed\n')
                self.log_file.flush()
                target_speed = dynamic_min
        # è½¯å¯åŠ¨ï¼šé™åˆ¶åŠ é€Ÿåº¦
        accel = self.accel * (0.5 if in_slow_zone else 1.0)
        speed_change = target_speed - self.current_speed
        max_change = accel * dt
        self.current_speed += max(-max_change, min(max_change, speed_change))
        # å€’车控制
        if self.stage == self.STAGE_GOTO_START:
            if abs(heading_err) > 2.9:
                self.current_speed = -self.max_reverse_mps * 0.5
        else:
            if abs(heading_err) > 2.9 and not self.in_recovery:
                self.current_speed = -self.max_reverse_mps * 0.4
            if self.current_speed < 0 and abs(heading_err) < 2.5:
                self.current_speed = 0.0
        # ç»ˆç‚¹æ£€æµ‹ï¼ˆç®€åŒ–,只基于距离和点索引)
        if final_dist < self.goal_tolerance:
            self.finished = True
            return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
        # æ‰©å±•完成条件:接近终点且误差小
        if (final_dist < self.goal_tolerance * 1.5 and
            self.current_target_idx >= self.n - 2 and
            abs(heading_err) < 0.3 and abs(xte) < 0.2):
            self.finished = True
            return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}}
        # map to signals [-100,100]
        # è®°å½•映射前的速度信息以便诊断
        log_t = timestamp if timestamp is not None else time.time()
        pre_map_speed = self.current_speed
        # è¯Šæ–­æ—¥å¿—(简化)
        target_x, target_y = lx, ly
        target_dist = hypot(target_x - x, target_y - y)
        pre_map_forward = int(round((pre_map_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
        pre_map_forward = max(-100, min(100, pre_map_forward))
        pre_map_turn = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0
        pre_map_turn = max(-100, min(100, pre_map_turn))
        self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                            f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
                            f'{xte:.3f},{heading_err:.3f},{pre_map_forward},{pre_map_turn},{pre_map_speed:.3f},'
                            f'{self.stage},pre_map\n')
        self.log_file.flush()
        forward_signal = int(round((self.current_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
        forward_signal = max(-100, min(100, forward_signal))
        # å¼ºåˆ¶æœ€å°å‰è¿›ä¿¡å·ï¼ˆé˜²æ­¢å®Œå…¨åœæ­¢ï¼‰
        if forward_signal == 0 and not self.finished and abs(heading_err) < 3.0:
            forced_signal = int(round((self.min_follow_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0
            if forced_signal != 0:
                target_x, target_y = lx, ly
                target_dist = hypot(target_x - x, target_y - y)
                self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                                    f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
                                    f'{xte:.3f},{heading_err:.3f},{forced_signal},{int(round((yawrate_cmd / self.max_yawrate) * 100))},'
                                    f'{self.min_follow_speed:.3f},{self.stage},force_min_forward\n')
                self.log_file.flush()
                forward_signal = forced_signal
                self.current_speed = self.min_follow_speed
        turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0
        turn_signal = max(-100, min(100, turn_signal))
        status = 'finished' if self.finished else 'running'
        info = {
            'status': status,
            'pos': (x, y),
            'heading': self.heading,
            'desired_heading': desired_heading,
            'heading_err': heading_err,
            'current_speed': self.current_speed,
            'yawrate_cmd': yawrate_cmd,
            'target_xy': (lx, ly),
            'path_curvature': path_curvature,
            'final_dist': final_dist,
        }
        # ç®€åŒ–的日志记录
        t = log_t
        target_x, target_y = lx, ly
        target_dist = hypot(target_x - x, target_y - y)
        self.log_file.write(f'{t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},'
                            f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},'
                            f'{xte:.3f},{heading_err:.3f},{forward_signal},{turn_signal},{self.current_speed:.3f},'
                            f'{self.stage},{status}\n')
        self.log_file.flush()
        return {'forward': forward_signal, 'turn': turn_signal, 'info': info}
if __name__ == '__main__':
    # quick manual test
    import json
    path = [[0,0],[5,0],[10,0]]
    c = MowerController(path)
    c.update_gps((0, -1.0), 0.0)
    for i in range(20):
        out = c.compute_control()
        print(i, out)
        # fake forward motion
        f = out['forward'] / 100.0 * c.max_forward_mps
        hd = c.heading + (out['turn'] / 100.0) * c.max_yawrate * 0.1
        px = c.pos[0] + f * cos(hd) * 0.1
        py = c.pos[1] + f * sin(hd) * 0.1
        c.update_gps((px,py), hd)
python/path_planner.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,164 @@
"""
Simple path planner to generate approach path from mower start position to planned path start.
"""
import numpy as np
from math import hypot, atan2, cos, sin, pi
def plan_approach_path(start_pos, start_heading, target_pos, target_heading, method='line'):
    """
    Generate approach path from mower start to planned path start.
    Args:
        start_pos: (x, y) mower starting position
        start_heading: mower starting heading in radians
        target_pos: (x, y) planned path starting position
        target_heading: planned path starting heading in radians
        method: 'line' | 'dubins' | 'smooth'
    Returns:
        List of (x, y) waypoints forming the approach path
    """
    sx, sy = start_pos
    tx, ty = target_pos
    dist = hypot(tx - sx, ty - sy)
    if dist < 0.05:  # Already at start, no approach needed
        return [start_pos, target_pos]
    if method == 'line':
        # Simple straight line with waypoints every 0.5m
        num_points = max(3, int(dist / 0.5) + 1)
        path = []
        for i in range(num_points):
            t = i / (num_points - 1)
            x = sx + t * (tx - sx)
            y = sy + t * (ty - sy)
            path.append((x, y))
        return path
    elif method == 'smooth':
        # Smooth curve considering start and target headings
        # Use cubic bezier curve
        # Control points based on headings
        ctrl_dist = min(dist * 0.4, 2.0)  # Control point distance
        # Start control point along start heading
        c1x = sx + ctrl_dist * cos(start_heading)
        c1y = sy + ctrl_dist * sin(start_heading)
        # End control point along target heading (backwards)
        c2x = tx - ctrl_dist * cos(target_heading)
        c2y = ty - ctrl_dist * sin(target_heading)
        # Generate bezier curve points
        num_points = max(5, int(dist / 0.3) + 1)
        path = []
        for i in range(num_points):
            t = i / (num_points - 1)
            # Cubic bezier formula
            x = (1-t)**3 * sx + 3*(1-t)**2*t * c1x + 3*(1-t)*t**2 * c2x + t**3 * tx
            y = (1-t)**3 * sy + 3*(1-t)**2*t * c1y + 3*(1-t)*t**2 * c2y + t**3 * ty
            path.append((x, y))
        return path
    else:
        raise ValueError(f"Unknown method: {method}")
def compute_path_heading(path, idx=0):
    """Compute heading of path at given index."""
    if idx >= len(path) - 1:
        idx = len(path) - 2
    if idx < 0:
        idx = 0
    dx = path[idx + 1][0] - path[idx][0]
    dy = path[idx + 1][1] - path[idx][1]
    return atan2(dy, dx)
def combine_paths(approach_path, work_path):
    """
    Combine approach path and work path into single path with metadata.
    Returns:
        combined_path: List of (x, y) waypoints
        approach_end_idx: Index where approach path ends (work path starts)
    """
    # Remove duplicate point at junction if exists
    if len(approach_path) > 0 and len(work_path) > 0:
        if hypot(approach_path[-1][0] - work_path[0][0],
                 approach_path[-1][1] - work_path[0][1]) < 0.01:
            combined = approach_path[:-1] + work_path
            approach_end_idx = len(approach_path) - 1
        else:
            combined = approach_path + work_path
            approach_end_idx = len(approach_path)
    else:
        combined = approach_path + work_path
        approach_end_idx = len(approach_path)
    return combined, approach_end_idx
if __name__ == '__main__':
    # Test path planning
    import json
    import matplotlib.pyplot as plt
    # Load planned path
    with open('example_path.json', 'r') as f:
        work_path = json.load(f)
    # Mower starts 2m away from path start
    mower_start = (work_path[0][0] - 1.5, work_path[0][1] - 1.5)
    mower_heading = pi / 4  # 45 degrees
    # Plan path heading
    plan_heading = compute_path_heading(work_path, 0)
    # Generate approach paths with different methods
    approach_line = plan_approach_path(mower_start, mower_heading,
                                       work_path[0], plan_heading, 'line')
    approach_smooth = plan_approach_path(mower_start, mower_heading,
                                        work_path[0], plan_heading, 'smooth')
    # Plot comparison
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    # Line method
    ax1.plot([p[0] for p in work_path], [p[1] for p in work_path],
             'b-', linewidth=2, label='Work Path')
    ax1.plot([p[0] for p in approach_line], [p[1] for p in approach_line],
             'r--', linewidth=2, marker='o', markersize=4, label='Approach (Line)')
    ax1.plot(mower_start[0], mower_start[1], 'go', markersize=10, label='Mower Start')
    ax1.arrow(mower_start[0], mower_start[1], 0.3*cos(mower_heading), 0.3*sin(mower_heading),
              head_width=0.15, head_length=0.1, fc='g', ec='g')
    ax1.set_xlabel('X (m)')
    ax1.set_ylabel('Y (m)')
    ax1.set_title('Line Approach')
    ax1.legend()
    ax1.grid(True)
    ax1.axis('equal')
    # Smooth method
    ax2.plot([p[0] for p in work_path], [p[1] for p in work_path],
             'b-', linewidth=2, label='Work Path')
    ax2.plot([p[0] for p in approach_smooth], [p[1] for p in approach_smooth],
             'r--', linewidth=2, marker='o', markersize=4, label='Approach (Smooth)')
    ax2.plot(mower_start[0], mower_start[1], 'go', markersize=10, label='Mower Start')
    ax2.arrow(mower_start[0], mower_start[1], 0.3*cos(mower_heading), 0.3*sin(mower_heading),
              head_width=0.15, head_length=0.1, fc='g', ec='g')
    ax2.set_xlabel('X (m)')
    ax2.set_ylabel('Y (m)')
    ax2.set_title('Smooth Approach')
    ax2.legend()
    ax2.grid(True)
    ax2.axis('equal')
    plt.tight_layout()
    plt.savefig('approach_path_test.png', dpi=150)
    print("Saved approach_path_test.png")
    print(f"Line approach: {len(approach_line)} points")
    print(f"Smooth approach: {len(approach_smooth)} points")
python/sim_offline.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,241 @@
"""
Offline simulation and path visualization
- Run complete trajectory with approach path
- Plot final path and control signals
- Save detailed logs for analysis
"""
import json
import time
from math import cos, sin, pi, hypot, atan2
import numpy as np
import matplotlib
# use non-interactive backend to avoid blocking GUI (run headless)
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from mower_controller import MowerController, wrap_angle
from path_planner import plan_approach_path, compute_path_heading, combine_paths
def simulate_full_path(work_path, mower_start_pos, mower_start_heading, params=None,
                       approach_method='smooth'):
    """
    Run complete simulation with approach path and return state history.
    Args:
        work_path: Planned work path [(x, y), ...]
        mower_start_pos: (x, y) mower starting position
        mower_start_heading: mower starting heading in radians
        params: Controller parameters dict
        approach_method: 'line' or 'smooth'
    """
    # Generate approach path from mower start to work path start
    work_path_heading = compute_path_heading(work_path, 0)
    approach_path = plan_approach_path(mower_start_pos, mower_start_heading,
                                       work_path[0], work_path_heading,
                                       method=approach_method)
    # Combine paths
    combined_path, approach_end_idx = combine_paths(approach_path, work_path)
    print(f"Generated approach path: {len(approach_path)} points")
    print(f"Work path: {len(work_path)} points")
    print(f"Combined path: {len(combined_path)} points (approach ends at idx {approach_end_idx})")
    # Initialize controller with combined path
    ctrl = MowerController(combined_path, params=params)
    # Initial state
    x, y = mower_start_pos
    heading = mower_start_heading
    t = 0.0
    dt = 1.0/74.0  # control period
    # History for plotting
    history = {
        't': [0.0],
        'x': [x],
        'y': [y],
        'heading': [heading],
        'forward': [0],
        'turn': [0],
        'approach_end_idx': approach_end_idx,  # Store for visualization
        'approach_path': approach_path,
        'work_path': work_path,
    }
    finished = False
    while not finished and t < 300.0:  # timeout after 300s
        # Update controller
        ctrl.update_gps((x, y), heading)
        ctrl.update_imu((0,0,0), (0,0,0))
        # Get control signals
        out = ctrl.compute_control(t)
        f_sig = out['forward']
        t_sig = out['turn']
        # Convert to physical quantities
        f_mps = (f_sig / 100.0) * ctrl.max_forward_mps
        yawrate = (t_sig / 100.0) * ctrl.max_yawrate
        # Update state
        heading = wrap_angle(heading + yawrate * dt)
        x = x + f_mps * cos(heading) * dt
        y = y + f_mps * sin(heading) * dt
        t += dt
        # Record
        history['t'].append(t)
        history['x'].append(x)
        history['y'].append(y)
        history['heading'].append(heading)
        history['forward'].append(f_sig)
        history['turn'].append(t_sig)
        # Check completion
        if out['info'].get('status') == 'finished':
            finished = True
            print(f'Reached goal in {t:.1f}s')
    if not finished:
        print('Timeout!')
    return history
def plot_results(history):
    """Plot path following results with approach and work paths."""
    approach_path = history['approach_path']
    work_path = history['work_path']
    approach_end_idx = history['approach_end_idx']
    fig = plt.figure(figsize=(15, 10))
    gs = fig.add_gridspec(3, 2)
    # Path plot (larger, spans two rows)
    ax_path = fig.add_subplot(gs[0:2, 0])
    # Plot approach path (dashed orange)
    apx = [p[0] for p in approach_path]
    apy = [p[1] for p in approach_path]
    ax_path.plot(apx, apy, '--', color='orange', lw=2, label='Approach Path', alpha=0.7)
    ax_path.scatter(apx, apy, c='orange', s=15, alpha=0.5)
    # Plot work path (solid black)
    wpx = [p[0] for p in work_path]
    wpy = [p[1] for p in work_path]
    ax_path.plot(wpx, wpy, '-k', lw=2, label='Work Path')
    ax_path.scatter(wpx, wpy, c='k', s=20)
    # Plot actual trajectory
    ax_path.plot(history['x'], history['y'], '-r', lw=1.5, label='Actual', alpha=0.8)
    # Add markers
    ax_path.plot(history['x'][0], history['y'][0], 'go', markersize=12,
                label='Mower Start', zorder=10)
    ax_path.plot(work_path[0][0], work_path[0][1], 'bs', markersize=10,
                label='Work Path Start', zorder=10)
    ax_path.plot(history['x'][-1], history['y'][-1], 'ro', markersize=10,
                label='End', zorder=10)
    ax_path.set_aspect('equal')
    ax_path.grid(True, alpha=0.3)
    ax_path.set_xlabel('X (m)')
    ax_path.set_ylabel('Y (m)')
    ax_path.set_title('Path Following Results (Approach + Work)')
    ax_path.legend(loc='best')
    # XY position vs time
    ax_xy = fig.add_subplot(gs[0:2, 1])
    ax_xy.plot(history['t'], history['x'], '-b', label='X')
    ax_xy.plot(history['t'], history['y'], '-r', label='Y')
    ax_xy.set_xlabel('Time (s)')
    ax_xy.set_ylabel('Position (m)')
    ax_xy.grid(True)
    ax_xy.legend()
    # Control signals
    ax_ctrl = fig.add_subplot(gs[2, :])
    ax_ctrl.plot(history['t'], history['forward'], '-b', label='Forward')
    ax_ctrl.plot(history['t'], history['turn'], '-r', label='Turn')
    ax_ctrl.set_xlabel('Time (s)')
    ax_ctrl.set_ylabel('Control Signal')
    ax_ctrl.grid(True)
    ax_ctrl.legend()
    plt.tight_layout()
    plt.savefig('simulation_results.png', dpi=300, bbox_inches='tight')
    plt.show()
def main():
    # Load planned work path
    try:
        with open('example_path.json', 'r') as f:
            work_path = json.load(f)
    except Exception as e:
        print(f'Failed to load path: {e}')
        return
    # Define mower starting position (offset from work path start)
    mower_start_pos = (work_path[0][0] + 3.5, work_path[0][1] + 1.0)
    mower_start_heading = pi / 4  # 45 degrees
    print(f"Mower starts at: {mower_start_pos}, heading: {mower_start_heading:.2f} rad")
    print(f"Work path starts at: {work_path[0]}")
    # Run simulation with stable high-precision parameters
    params = {
        # === æ ¸å¿ƒæŽ§åˆ¶å¢žç›Šï¼ˆä¿®æ­£åŽï¼‰ ===
        'k_heading': 2.0,            # èˆªå‘增益
        'k_xte': 0.8,                # æ¨ªå‘误差增益:2.0 â†’ 0.8 (大幅降低以防止过度修正)
        'k_heading_d': 0.1,          # èˆªå‘微分增益
        'k_heading_i': 0.015,        # èˆªå‘积分增益
        'k_xte_i': 0.02,             # æ¨ªå‘积分增益
        # === é€Ÿåº¦æŽ§åˆ¶ ===
        'base_speed': 0.8,           # åŸºç¡€é€Ÿåº¦ï¼š0.6 â†’ 0.8
        'max_forward_mps': 1.2,      # æœ€å¤§é€Ÿåº¦ï¼š0.95 â†’ 1.2
        'min_follow_speed': 0.40,    # æœ€å°é€Ÿåº¦ï¼š0.30 â†’ 0.40
        'max_reverse_mps': 0.25,
        'max_yawrate': pi/2.5,       # è½¬å‘速率:pi/3 â†’ pi/2.5 (更快转向)
        # === å‰è§†è·ç¦» ===
        'lookahead_min': 0.35,       # æœ€å°å‰è§†
        'lookahead_max': 1.2,        # æœ€å¤§å‰è§†
        'lookahead_time': 2.0,       # å‰è§†æ—¶é—´ç³»æ•°
        # === è¯¯å·®å®¹å¿åº¦ ===
        'max_xte':1.5,              # æœ€å¤§æ¨ªå‘误差:0.8 â†’ 1.0 (增大以减少归一化放大)
        'goal_tolerance': 0.1,       # ç›®æ ‡å®¹å¿
        'large_err_threshold': 0.5,  # å¤§è¯¯å·®é˜ˆå€¼
        # === æ»¤æ³¢å‚æ•° ===
        'yawrate_alpha': 0.85,       # è½¬å‘滤波
        # === é€Ÿåº¦ç¼©æ”¾å¢žç›Š ===
        'curv_gain_scale': 2.0,      # æ›²çŽ‡ç¼©æ”¾
        'heading_err_gain_scale': 1.5,  # èˆªå‘误差缩放
        'xte_gain_scale': 1.0,       # æ¨ªå‘误差缩放:1.2 â†’ 1.0 (移除额外放大)
        # === ç»ˆç‚¹å‡é€ŸåŒºåŸŸ ===
        'slow_zone': 1.5,
        'final_approach_dist': 0.8,
        # === èµ·ç‚¹é˜¶æ®µ ===
        'start_min_speed': 0.28,
        'start_slow_speed': 0.38,
        # === æ¢å¤æ¨¡å¼ ===
        'recovery_dist': 2.5,
        'recovery_exit_dist': 0.6,
        'recovery_speed': 0.6,
    }
    history = simulate_full_path(work_path, mower_start_pos, mower_start_heading,
                                 params, approach_method='smooth')
    plot_results(history)
if __name__ == '__main__':
    main()
python/view_simulation.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,22 @@
"""
Simple image viewer for simulation results
"""
from PIL import Image
import matplotlib.pyplot as plt
# Open the saved simulation image
img = Image.open('simulation_results.png')
# Display
plt.figure(figsize=(16, 10))
plt.imshow(img)
plt.axis('off')
plt.title('Simulation Results with Approach Path')
plt.tight_layout()
plt.savefig('view_results.png', dpi=150, bbox_inches='tight')
print("Saved view_results.png")
print(f"Image size: {img.size}")
print("\n✓ Approach path (orange dashed) + Work path (black) are now visualized")
print("✓ Mower starts from offset position and follows approach path to work path start")
print("\n⚠️ Note: Trajectory still has issues after ~40s due to segment progression bug")
print("   This is the existing Pure Pursuit restoration issue, not related to approach path")
python/viz.py
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,147 @@
"""
viz.py
读取 sim_log.csv(由 test.py ç”Ÿæˆï¼‰å¹¶ç”¨ matplotlib åŠ¨ç”»æ˜¾ç¤ºï¼š
- ä¸»å›¾ï¼šè·¯å¾„点(`example_path.json`)和割草机当前位置/航向箭头
- ä¸‹æ–¹ä¸¤ä¸ªå­å›¾ï¼šforward_sig ä¸Ž turn_sig éšæ—¶é—´çš„æ›²çº¿ï¼Œå½“前帧用竖线标示
运行:
    python viz.py
如果没有 `sim_log.csv`,脚本会提示先运行 `test.py` ç”Ÿæˆæ•°æ®ã€‚
"""
import os
import json
import csv
import sys
from math import cos, sin
import numpy as np
try:
    import matplotlib.pyplot as plt
    from matplotlib.animation import FuncAnimation
except Exception as e:
    print('matplotlib required. Install with: pip install matplotlib')
    raise
SIM_CSV = 'sim_log.csv'
PATH_FILE = 'example_path.json'
def load_sim(csv_path=SIM_CSV):
    if not os.path.exists(csv_path):
        print(f"Sim log '{csv_path}' not found. Run test.py first to generate it.")
        sys.exit(1)
    t, xs, ys, headings, forward_sig, turn_sig = [], [], [], [], [], []
    with open(csv_path, 'r', newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            t.append(float(row['t']))
            xs.append(float(row['x']))
            ys.append(float(row['y']))
            headings.append(float(row['heading']))
            forward_sig.append(int(row['forward_sig']))
            turn_sig.append(int(row['turn_sig']))
    return {'t': t, 'x': xs, 'y': ys, 'heading': headings, 'forward': forward_sig, 'turn': turn_sig}
def load_path(pth=PATH_FILE):
    if not os.path.exists(pth):
        return []
    with open(pth, 'r') as f:
        return json.load(f)
def animate_sim(simdata, path):
    t = simdata['t']
    x = simdata['x']
    y = simdata['y']
    h = simdata['heading']
    fwd = simdata['forward']
    trn = simdata['turn']
    fig = plt.figure(figsize=(10, 8))
    gs = fig.add_gridspec(3, 1, height_ratios=[3, 1, 1])
    ax_map = fig.add_subplot(gs[0])
    ax_f = fig.add_subplot(gs[1])
    ax_t = fig.add_subplot(gs[2])
    # map plot
    if path:
        px = [p[0] for p in path]
        py = [p[1] for p in path]
        ax_map.plot(px, py, '-k', lw=1, label='planned path')
        ax_map.scatter(px, py, c='k', s=10)
    line_pos, = ax_map.plot([], [], '-r', lw=2, label='actual')
    point_pos, = ax_map.plot([], [], 'ro')
    # heading arrow using quiver (initialize at first pose if available)
    if len(x) > 0 and len(h) > 0:
        quiv = ax_map.quiver([x[0]], [y[0]], [cos(h[0])], [sin(h[0])], angles='xy', scale_units='xy', scale=4, color='b')
    else:
        quiv = ax_map.quiver([0], [0], [1], [0], angles='xy', scale_units='xy', scale=4, color='b')
    ax_map.set_aspect('equal', 'box')
    ax_map.set_title('Mower path and pose')
    ax_map.legend()
    # forward subplot
    ax_f.plot(t, fwd, color='gray', alpha=0.5)
    f_line = ax_f.axvline(t[0], color='r')
    ax_f.set_ylabel('forward_sig')
    # turn subplot
    ax_t.plot(t, trn, color='gray', alpha=0.5)
    t_line = ax_t.axvline(t[0], color='r')
    ax_t.set_ylabel('turn_sig')
    ax_t.set_xlabel('time (s)')
    # set map limits
    margin = 1.0
    all_x = x + ( [p[0] for p in path] if path else [] )
    all_y = y + ( [p[1] for p in path] if path else [] )
    if all_x and all_y:
        ax_map.set_xlim(min(all_x)-margin, max(all_x)+margin)
        ax_map.set_ylim(min(all_y)-margin, max(all_y)+margin)
    def init():
        line_pos.set_data([], [])
        point_pos.set_data([], [])
        return line_pos, point_pos, quiv, f_line, t_line
    def update(i):
        # i is frame index
        xi = x[:i+1]
        yi = y[:i+1]
        line_pos.set_data(xi, yi)
        point_pos.set_data([x[i]], [y[i]])
        # update quiver in-place
        hx = cos(h[i])
        hy = sin(h[i])
        # set new offset and vector
        quiv.set_offsets(np.array([[x[i], y[i]]]))
        quiv.set_UVC(np.array([hx]), np.array([hy]))
        # update vertical lines
        f_line.set_xdata(t[i])
        t_line.set_xdata(t[i])
        return line_pos, point_pos, quiv, f_line, t_line
    interval_ms = max(10, int(1000.0 / 74.0))
    ani = FuncAnimation(fig, update, frames=len(t), init_func=init, blit=False, interval=interval_ms)
    plt.tight_layout()
    plt.show()
def main():
    sim = load_sim()
    path = load_path()
    animate_sim(sim, path)
if __name__ == '__main__':
    main()