| | |
| | | /STM32H743/MDK-ARM/STM32H743/*.o |
| | | /STM32H743/MDK-ARM/STM32H743/*.htm |
| | | /STM32H743/MDK-ARM/STM32H743/*.axf |
| | | /STM32H743/MDK-ARM/STM32H743/*.hex |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """Analyze path_heading from log""" |
| | | from math import atan2, pi |
| | | |
| | | with open('controller_log.csv') as f: |
| | | lines = [l.strip() for l in f.readlines()[1:]] |
| | | |
| | | filtered = [l.split(',') for l in lines if len(l.split(',')) >= 9] |
| | | |
| | | print("Analysis at key time points:") |
| | | for target_t in [10.0, 20.0, 30.0]: |
| | | rows = [p for p in filtered if abs(float(p[0]) - target_t) < 0.05] |
| | | if rows: |
| | | r = rows[0] |
| | | t = float(r[0]) |
| | | x, y = float(r[1]), float(r[2]) |
| | | heading = float(r[3]) |
| | | tx, ty = float(r[4]), float(r[5]) |
| | | xte = float(r[8]) |
| | | heading_err = float(r[9]) |
| | | |
| | | path_heading = atan2(ty-y, tx-x) |
| | | |
| | | print(f"\nt={t:.1f}s:") |
| | | print(f" pos=({x:.3f}, {y:.3f}), heading={heading:.3f} rad = {heading*180/pi:.1f} deg") |
| | | print(f" target=({tx:.3f}, {ty:.3f})") |
| | | print(f" path_heading={path_heading:.3f} rad = {path_heading*180/pi:.1f} deg") |
| | | print(f" xte={xte:.3f}, heading_err={heading_err:.3f}") |
| | | print(f" => desired_heading = path_heading + xte_correction") |
| | | print(f" = {path_heading:.3f} + (-1.2 * {xte:.3f})") |
| | | print(f" = {path_heading - 1.2*xte:.3f} rad = {(path_heading - 1.2*xte)*180/pi:.1f} deg") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | Check trajectory with approach path visualization |
| | | """ |
| | | import json |
| | | |
| | | # Load controller log |
| | | lines = [] |
| | | with open('controller_log.csv', 'r') as f: |
| | | for line in f: |
| | | lines.append(line.strip()) |
| | | |
| | | # Find reached_start marker |
| | | reached_start_idx = None |
| | | for i, line in enumerate(lines): |
| | | if 'reached_start' in line: |
| | | reached_start_idx = i |
| | | print(f"Found reached_start at line {i+1}") |
| | | break |
| | | |
| | | if reached_start_idx is None: |
| | | print("No reached_start found") |
| | | exit(1) |
| | | |
| | | # Parse and display trajectory |
| | | print("\n=== TRAJECTORY ANALYSIS WITH APPROACH PATH ===\n") |
| | | |
| | | # Show first 20 frames after reached_start |
| | | print("First 20 frames after reached_start (now following combined path):") |
| | | for i in range(reached_start_idx + 1, min(reached_start_idx + 21, len(lines))): |
| | | parts = lines[i].split(',') |
| | | if len(parts) >= 8: |
| | | t = float(parts[0]) |
| | | x, y = float(parts[1]), float(parts[2]) |
| | | heading = float(parts[3]) |
| | | target_x, target_y = float(parts[4]), float(parts[5]) |
| | | target_idx = int(parts[7]) |
| | | xte = float(parts[8]) if len(parts) > 8 else 0.0 |
| | | h_err = float(parts[9]) if len(parts) > 9 else 0.0 |
| | | print(f"t={t:6.3f} pos=({x:6.3f},{y:6.3f}) h={heading:5.3f} target=({target_x:6.3f},{target_y:6.3f}) idx={target_idx:3d} xte={xte:6.3f} h_err={h_err:6.3f}") |
| | | |
| | | # Sample every 10 seconds |
| | | print("\n\nPosition every 10 seconds:") |
| | | for target_t in [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110]: |
| | | for line in lines[reached_start_idx+1:]: |
| | | parts = line.split(',') |
| | | if len(parts) >= 8: |
| | | t = float(parts[0]) |
| | | if abs(t - target_t) < 0.1: # within 0.1s |
| | | x, y = float(parts[1]), float(parts[2]) |
| | | heading = float(parts[3]) |
| | | print(f"t={t:6.1f}s: pos=({x:7.3f},{y:7.3f}) heading={heading:6.3f}") |
| | | break |
| | | |
| | | # Check if completed |
| | | print("\n\nCompletion status:") |
| | | last_line = lines[-1] |
| | | parts = last_line.split(',') |
| | | if len(parts) >= 14: |
| | | t = float(parts[0]) |
| | | x, y = float(parts[1]), float(parts[2]) |
| | | stage = parts[13] |
| | | print(f"Last frame: t={t:.1f}s, pos=({x:.3f},{y:.3f}), stage={stage}") |
| | | if t >= 119.0: |
| | | print("â ï¸ TIMEOUT - did not complete in 120s") |
| | | else: |
| | | print("â Completed successfully") |
| | | |
| | | # Load paths and check indices |
| | | print("\n\n=== PATH INFORMATION ===") |
| | | with open('example_path.json', 'r') as f: |
| | | work_path = json.load(f) |
| | | |
| | | print(f"Work path: {len(work_path)} points ({len(work_path)-1} segments)") |
| | | print(f"Work path range: x=[{min(p[0] for p in work_path):.1f}, {max(p[0] for p in work_path):.1f}], " |
| | | f"y=[{min(p[1] for p in work_path):.1f}, {max(p[1] for p in work_path):.1f}]") |
| | | |
| | | # Estimate approach path length |
| | | print(f"\nNote: Combined path includes approach path + work path") |
| | | print(f"Check simulation_results.png to see both paths visualized") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """Check path segment headings""" |
| | | from math import atan2, pi |
| | | |
| | | approach_points = [ |
| | | (-1.5, -1.0), |
| | | (-1.262, -0.749), |
| | | (-1.045, -0.514), |
| | | (-0.829, -0.309), |
| | | (-0.596, -0.146), |
| | | (-0.326, -0.039), |
| | | (0, 0) |
| | | ] |
| | | |
| | | print("Approach path segment headings:") |
| | | for i in range(len(approach_points)-1): |
| | | p1, p2 = approach_points[i], approach_points[i+1] |
| | | heading = atan2(p2[1]-p1[1], p2[0]-p1[0]) |
| | | print(f"Segment {i}->{i+1}: heading={heading:.3f} rad = {heading*180/pi:.1f} deg") |
| | | |
| | | print("\nWork path segment headings (first few):") |
| | | work_points = [(0, 0), (10, 0), (10, 1), (0, 1)] |
| | | for i in range(len(work_points)-1): |
| | | p1, p2 = work_points[i], work_points[i+1] |
| | | heading = atan2(p2[1]-p1[1], p2[0]-p1[0]) |
| | | print(f"Segment {i}->{i+1}: heading={heading:.3f} rad = {heading*180/pi:.1f} deg") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | #!/usr/bin/env python |
| | | # æ£æ¥è½¨è¿¹è¯¦ç»æ°æ® |
| | | |
| | | with open('controller_log.csv', 'r') as f: |
| | | lines = f.readlines() |
| | | |
| | | # æ¾å° reached_start |
| | | reached_idx = None |
| | | for i, line in enumerate(lines): |
| | | if 'reached_start' in line: |
| | | reached_idx = i |
| | | break |
| | | |
| | | if reached_idx: |
| | | print(f"Found reached_start at line {reached_idx}") |
| | | |
| | | # å20帧çè¯¦ç»æ°æ® |
| | | print("\nFirst 20 frames after reached_start:") |
| | | count = 0 |
| | | for i in range(reached_idx + 1, min(reached_idx + 100, len(lines))): |
| | | if 'running' in lines[i]: |
| | | parts = lines[i].strip().split(',') |
| | | if len(parts) >= 11: |
| | | t, x, y, heading, tx, ty, tdist, tidx, xte, herr = parts[:10] |
| | | print(f"t={t:>6s} pos=({x:>6s},{y:>6s}) h={heading:>5s} target=({tx:>6s},{ty:>6s}) idx={tidx:>3s} xte={xte:>6s} h_err={herr:>6s}") |
| | | count += 1 |
| | | if count >= 20: |
| | | break |
| | | |
| | | # å¤ä¸ªæ¶é´ç¹çä½ç½® |
| | | print("\n\nPosition every 10 seconds:") |
| | | target_times = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120] |
| | | for target_t in target_times: |
| | | for i in range(reached_idx + 1, len(lines)): |
| | | if 'running' in lines[i]: |
| | | parts = lines[i].strip().split(',') |
| | | if len(parts) >= 3: |
| | | t = float(parts[0]) |
| | | if t >= target_t: |
| | | x, y, heading = parts[1], parts[2], parts[3] |
| | | print(f"t={t:>6.1f}s: pos=({x:>7s},{y:>7s}) heading={heading:>6s}") |
| | | break |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """Debug desired_heading calculation""" |
| | | from math import atan2, pi |
| | | |
| | | def wrap_angle(a): |
| | | while a > pi: |
| | | a -= 2 * pi |
| | | while a < -pi: |
| | | a += 2 * pi |
| | | return a |
| | | |
| | | # From log at t=10s |
| | | x, y = -0.282, -0.638 |
| | | heading = -0.470 |
| | | tx, ty = 0.000, 0.000 |
| | | xte = 0.573 |
| | | heading_err_log = -0.095 |
| | | |
| | | # Calculate path_heading |
| | | path_heading = atan2(ty - y, tx - x) |
| | | print(f"path_heading = atan2({ty} - {y}, {tx} - {x}) = atan2({ty-y:.3f}, {tx-x:.3f}) = {path_heading:.3f}") |
| | | |
| | | # Calculate xte_correction |
| | | k_xte = 1.2 |
| | | max_xte = 1.0 |
| | | xte_gain_scale = 1.0 |
| | | xte_normalized = max(-1.0, min(1.0, xte / max_xte)) |
| | | xte_correction = -k_xte * xte_normalized * xte_gain_scale |
| | | print(f"xte_correction = -k_xte * xte_normalized * xte_gain_scale") |
| | | print(f" = -{k_xte} * {xte_normalized:.3f} * {xte_gain_scale}") |
| | | print(f" = {xte_correction:.3f}") |
| | | |
| | | # Calculate desired_heading |
| | | desired_heading_raw = path_heading + xte_correction |
| | | desired_heading = wrap_angle(desired_heading_raw) |
| | | print(f"desired_heading = wrap_angle({path_heading:.3f} + {xte_correction:.3f})") |
| | | print(f" = wrap_angle({desired_heading_raw:.3f})") |
| | | print(f" = {desired_heading:.3f}") |
| | | |
| | | # Calculate heading_err |
| | | heading_err_calc = wrap_angle(desired_heading - heading) |
| | | print(f"\nheading_err = wrap_angle({desired_heading:.3f} - {heading:.3f})") |
| | | print(f" = wrap_angle({desired_heading - heading:.3f})") |
| | | print(f" = {heading_err_calc:.3f}") |
| | | |
| | | print(f"\n heading_err from log: {heading_err_log:.3f}") |
| | | print(f"heading_err calculated: {heading_err_calc:.3f}") |
| | | print(f" Difference: {abs(heading_err_log - heading_err_calc):.3f}") |
| | | |
| | | if abs(heading_err_log - heading_err_calc) > 0.1: |
| | | print("\nâ ï¸ WARNING: Significant difference detected!") |
| | | print("Either:") |
| | | print("1. xte_correction is not being applied") |
| | | print("2. Different target point is being used") |
| | | print("3. Log data is from a different code path") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """Debug Pure Pursuit logic""" |
| | | import json |
| | | from math import pi, hypot, atan2, cos, sin |
| | | from path_planner import plan_approach_path, compute_path_heading, combine_paths |
| | | from mower_controller import MowerController |
| | | |
| | | # Load work path |
| | | work_path = json.load(open('example_path.json')) |
| | | mower_start_pos = (work_path[0][0] - 1.5, work_path[0][1] - 1.0) |
| | | mower_start_heading = pi / 4 |
| | | |
| | | # Generate approach path |
| | | work_path_heading = compute_path_heading(work_path, 0) |
| | | approach_path = plan_approach_path(mower_start_pos, mower_start_heading, |
| | | work_path[0], work_path_heading, |
| | | method='smooth') |
| | | |
| | | # Combine paths |
| | | combined_path, approach_end_idx = combine_paths(approach_path, work_path) |
| | | |
| | | print(f"Combined path: {len(combined_path)} points") |
| | | print(f"First 10 points:") |
| | | for i in range(min(10, len(combined_path))): |
| | | print(f" {i}: {combined_path[i]}") |
| | | |
| | | # Initialize controller |
| | | ctrl = MowerController(combined_path) |
| | | print(f"\nDensified path: {len(ctrl.path)} points") |
| | | print(f"First 20 densified points:") |
| | | for i in range(min(20, len(ctrl.path))): |
| | | print(f" {i}: {ctrl.path[i]}") |
| | | |
| | | # Simulate a few steps |
| | | ctrl.update_gps(pos_xy=mower_start_pos, heading=mower_start_heading) |
| | | ctrl.stage = ctrl.STAGE_FOLLOW_PATH # Skip goto_start |
| | | |
| | | print("\n=== Testing Pure Pursuit Logic ===") |
| | | for step in range(5): |
| | | result = ctrl.compute_control(timestamp=step * 0.014) |
| | | x, y = ctrl.pos |
| | | target_idx = ctrl.current_target_idx if hasattr(ctrl, 'current_target_idx') else -1 |
| | | if target_idx >= 0 and target_idx < len(ctrl.path): |
| | | tx, ty = ctrl.path[target_idx] |
| | | target_heading = atan2(ty - y, tx - x) |
| | | print(f"\nStep {step}:") |
| | | print(f" Pos: ({x:.3f}, {y:.3f}), heading: {ctrl.heading:.3f}") |
| | | print(f" Target idx: {target_idx}, target: ({tx:.3f}, {ty:.3f})") |
| | | print(f" Target heading: {target_heading:.3f}") |
| | | print(f" Command: forward={result['forward']}, turn={result['turn']}") |
| | | |
| | | # Simulate motion (simplified kinematic model) |
| | | dt = 0.014 |
| | | forward_mps = result['forward'] / 100.0 * ctrl.max_forward_mps |
| | | yawrate = result['turn'] / 100.0 * ctrl.max_yawrate |
| | | |
| | | # Update heading |
| | | new_heading = ctrl.heading + yawrate * dt |
| | | |
| | | # Update position |
| | | vx = forward_mps * cos(new_heading) |
| | | vy = forward_mps * sin(new_heading) |
| | | new_x = x + vx * dt |
| | | new_y = y + vy * dt |
| | | |
| | | ctrl.update_gps(pos_xy=(new_x, new_y), heading=new_heading) |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | import csv, json, math |
| | | |
| | | with open('example_path.json') as f: |
| | | path = json.load(f) |
| | | start = tuple(path[0]) |
| | | |
| | | rows = [] |
| | | with open('controller_log.csv') as f: |
| | | reader = csv.DictReader(f) |
| | | for r in reader: |
| | | rows.append(r) |
| | | |
| | | first_follow_idx = None |
| | | for i, r in enumerate(rows): |
| | | if r.get('stage','') == 'follow_path': |
| | | first_follow_idx = i |
| | | break |
| | | |
| | | if first_follow_idx is not None: |
| | | print('Found follow_path at row', first_follow_idx, 't=', rows[first_follow_idx]['t']) |
| | | for j in range(max(0, first_follow_idx-5), first_follow_idx+5): |
| | | r = rows[j] |
| | | x = float(r['x']); y = float(r['y']); heading = float(r['heading']) |
| | | dx = start[0]-x; dy = start[1]-y |
| | | desired = math.atan2(dy, dx) |
| | | h_err = (desired - heading + math.pi) % (2*math.pi) - math.pi |
| | | print(j, r['t'], r['stage'], r['status'], f"pos=({x:.3f},{y:.3f})", f"dist={(math.hypot(dx,dy)):.3f}", f"h_err={h_err:.3f}") |
| | | else: |
| | | print('No follow_path in log. Scanning for closest approach to start...') |
| | | min_dist = float('inf') |
| | | min_idx = None |
| | | for i, r in enumerate(rows): |
| | | x = float(r['x']); y = float(r['y']); |
| | | d = math.hypot(start[0]-x, start[1]-y) |
| | | if d < min_dist: |
| | | min_dist = d; min_idx = i |
| | | print('min dist to start =', min_dist, 'at row', min_idx, 't=', rows[min_idx]['t']) |
| | | print('Nearby log lines:') |
| | | for j in range(max(0, min_idx-10), min_idx+11): |
| | | r = rows[j] |
| | | x = float(r['x']); y = float(r['y']); heading = float(r['heading']) |
| | | dx = start[0]-x; dy = start[1]-y |
| | | desired = math.atan2(dy, dx) |
| | | h_err = (desired - heading + math.pi) % (2*math.pi) - math.pi |
| | | print(j, r['t'], r['stage'], r['status'], f"pos=({x:.3f},{y:.3f})", f"dist={(math.hypot(dx,dy)):.3f}", f"h_err={h_err:.3f}") |
| | | |
| | | # also print counts of key statuses |
| | | from collections import Counter |
| | | cnt = Counter((r['stage'], r['status']) for r in rows) |
| | | print('\nStage/Status counts (top 20):') |
| | | for k,v in cnt.most_common(20): |
| | | print(k, v) |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | MowerController |
| | | |
| | | æ§å¶å¨å设ä¸å¥çº¦ï¼ |
| | | - åæ ç³»ï¼XY å¹³é¢åä½ä¸ºç±³ï¼RTK æä¾ï¼ï¼Z 忽ç¥ç¨äºå¹³é¢è·è¸ªã |
| | | - èªåï¼headingï¼ï¼å¼§åº¦ï¼èå´ä»»æï¼æ§å¶å¨ä¼å¤çç¯ç»ã |
| | | - GPS é¢çï¼ä½ç½®/é度/heading/attitudeï¼çº¦ 10Hzï¼IMUï¼å é度ãè§é度ï¼100Hzã |
| | | - æ§å¶è¾åºé¢çï¼74Hzï¼è¿å两个信å·ï¼forwardï¼-100..100ï¼å¯¹åº -1..+1 m/sï¼å turnï¼-100..100ï¼å¯¹åº -pi/2..+pi/2 rad/sï¼æ£ä¸ºå·¦è½¬ï¼ã |
| | | - è·¯å¾æä»¶ï¼JSON å表 [[x,y],[x,y],...] |
| | | |
| | | 设计说æï¼ |
| | | - 使ç¨çº¯è¿½è¸ªï¼pure pursuitï¼æ¹æ³éååè§ç¹ï¼å¹¶åºäºè§åº¦è¯¯å·®ä½¿ç¨ P æ§å¶è®¡ç®ææåèªéçï¼rad/sï¼ã |
| | | - åè¿é度ä¸åèªè¯¯å·®è¦åï¼åèªè¯¯å·®å¤§æ¶éä½åè¿é度以ä¿è¯å¯è½¬å¼¯è½åã |
| | | |
| | | æ¥å£ï¼ |
| | | - controller = MowerController(path_points, params) |
| | | - controller.update_gps(pos_xy, heading, vel_xy, timestamp) |
| | | - controller.update_imu(accel3, gyro3, timestamp) |
| | | - controller.compute_control(timestamp) -> {'forward': int, 'turn': int, 'info': {...}} |
| | | |
| | | """ |
| | | from math import atan2, hypot, sin, cos, pi, acos |
| | | import time |
| | | |
| | | |
| | | def wrap_angle(a): |
| | | # wrap to [-pi, pi) |
| | | while a >= pi: |
| | | a -= 2 * pi |
| | | while a < -pi: |
| | | a += 2 * pi |
| | | return a |
| | | |
| | | |
| | | class MowerController: |
| | | def __init__(self, path_points, params=None): |
| | | # path_points: list of [x,y] |
| | | # åå§è·¯å¾ç¹ |
| | | original_path = [tuple(p) for p in path_points] |
| | | if len(original_path) == 0: |
| | | raise ValueError("path_points cannot be empty") |
| | | |
| | | # ä¿ååå§è·¯å¾ç¨äºæç¹æ£æµ |
| | | self.original_path = original_path |
| | | |
| | | # è·¯å¾å å¯ï¼densifyï¼ï¼é¿å
馿®µè¿é¿å¯¼è´çº¯è¿½è¸ªåè§ç¹è·³å°è¿å¤ |
| | | p_params = params or {} |
| | | max_seg_len = p_params.get('max_segment_length', 0.6) # é»è®¤å°é¿æ®µåå为 <=0.6m |
| | | densified = [] |
| | | for i in range(len(original_path) - 1): |
| | | x1, y1 = original_path[i] |
| | | x2, y2 = original_path[i + 1] |
| | | densified.append((x1, y1)) |
| | | dx = x2 - x1 |
| | | dy = y2 - y1 |
| | | seg_len = hypot(dx, dy) |
| | | if seg_len > max_seg_len and seg_len > 1e-6: |
| | | # éè¦æå
¥è¥å¹²ä¸é´ç¹ï¼ä¸å
å«ç»ç¹ï¼ç»ç¹ä¸ä¸å¾ªç¯æ·»å ï¼ |
| | | steps = int(seg_len // max_seg_len) |
| | | # 使ç¨ååçº¿æ§æå¼ï¼1..stepsï¼ï¼é²æ¢é¦æ®µè¿é¿çº¯è¿½è¸ªç´æ¥è·³è¿ |
| | | for k in range(1, steps + 1): |
| | | ratio = k / (steps + 1) |
| | | densified.append((x1 + dx * ratio, y1 + dy * ratio)) |
| | | # æ·»å æåä¸ä¸ªç¹ |
| | | densified.append(original_path[-1]) |
| | | self.path = densified |
| | | self.n = len(self.path) |
| | | # é¢è®¡ç®æ¯æ®µé¿åº¦å累积é¿åº¦ç¨äºè¿åº¦è®¡ç® |
| | | self.segment_lengths = [] |
| | | self.cum_lengths = [0.0] |
| | | for i in range(self.n - 1): |
| | | ax, ay = self.path[i] |
| | | bx, by = self.path[i+1] |
| | | L = hypot(bx-ax, by-ay) |
| | | self.segment_lengths.append(L) |
| | | self.cum_lengths.append(self.cum_lengths[-1] + L) |
| | | self.total_length = self.cum_lengths[-1] if self.cum_lengths else 0.0 |
| | | # è¿åº¦åè°æ§ä¿æ¤åé |
| | | self.last_progress = 0.0 |
| | | |
| | | # parameters and gains |
| | | p = params or {} |
| | | self.max_forward_mps = p.get('max_forward_mps', 0.6) # æ´ä¿å®çé度 |
| | | self.max_reverse_mps = p.get('max_reverse_mps', 0.2) # ä¿å®çå车é度 |
| | | self.max_yawrate = p.get('max_yawrate', pi/4) # æ´ä¿å®ç转åéç |
| | | |
| | | # 软å¯å¨åå åé |
| | | self.accel = p.get('accel', 0.2) # m/s^2 |
| | | self.current_speed_target = 0.0 # ç®æ é度 |
| | | self.current_speed = 0.0 # å®é
é度 |
| | | |
| | | # æ§å¶å¢çï¼æé«ååºé度åç²¾åº¦ï¼ |
| | | self.k_heading = p.get('k_heading', 1.8) # 大å¹
æé«è½¬åå¢çï¼å å¿«èªåæ¶æ |
| | | self.k_heading_d = p.get('k_heading_d', 0.25) # å¢å å¾®åä½ç¨ï¼æå¶æ¯è¡ |
| | | self.k_xte = p.get('k_xte', 1.2) # 大å¹
æé«æ¨ªå误差å¢çï¼æ¿è¿çº æ£å离 |
| | | self.k_curv = p.get('k_curv', 0.2) # é使²çåé¦å¢ç |
| | | self.base_speed = p.get('base_speed', 0.5) # æé«åºç¡é度 |
| | | |
| | | # è·¯å¾è·è¸ªåæ°ï¼åå°lookahead以æé«ç²¾åº¦ï¼ |
| | | self.lookahead_time = p.get('lookahead_time', 2.0) # åå°åè§æ¶é´ |
| | | # åå°æå°åè§è·ç¦»ä»¥æé«è½¨è¿¹ç²¾åº¦ |
| | | self.lookahead_min = p.get('lookahead_min', 0.35) |
| | | self.lookahead_max = p.get('lookahead_max', 1.5) # å尿大åè§è·ç¦» |
| | | self.goal_tolerance = p.get('goal_tolerance', 0.3) # m |
| | | self.slow_zone = p.get('slow_zone', 1.5) # åå°åéåºå |
| | | self.final_approach_dist = p.get('final_approach_dist', 0.8) # åå°ç²¾ç¡®æ¥è¿åºå |
| | | |
| | | # æ°å¢ï¼æ¨ªå误差ç¸å
³ |
| | | self.max_xte = p.get('max_xte', 1.0) # åå°æå¤§æ¨ªå误差éå¶ï¼æé«ç²¾åº¦ |
| | | self.xte_gain_scale = p.get('xte_gain_scale', 1.0) # ç§»é¤æ¨ªå误差缩æ¾ï¼ä½¿ç¨å®æ´å¢ç |
| | | |
| | | # é度æ§å¶ç¼©æ¾åæ° |
| | | self.curv_gain_scale = p.get('curv_gain_scale', 2.0) # æ²ç缩æ¾å¢ç |
| | | self.heading_err_gain_scale = p.get('heading_err_gain_scale', 1.5) # èªå误差缩æ¾å¢ç |
| | | |
| | | # ç¶æä¼°è®¡åæ»¤æ³¢ |
| | | self.yawrate_filter = 0.0 |
| | | self.yawrate_alpha = p.get('yawrate_alpha', 0.90) # è¿ä¸æ¥å¢å å¹³æ»åº¦ |
| | | self.last_heading_err = 0.0 # ç¨äºDæ§å¶ |
| | | self.last_time = None # ç¨äºè®¡ç®dt |
| | | self.last_curv = 0.0 # ç¨äºå¹³æ»æ²çåå |
| | | |
| | | # 累积误差æ§å¶ |
| | | self.heading_err_sum = 0.0 # èªå误差积å |
| | | self.xte_sum = 0.0 # 横å误差积å |
| | | self.max_sum = 2.0 # 积åéå¹
|
| | | self.k_heading_i = p.get('k_heading_i', 0.01) # èªå误差积åå¢ç |
| | | self.k_xte_i = p.get('k_xte_i', 0.01) # 横å误差积åå¢ç |
| | | |
| | | # é误æ¢å¤ |
| | | self.consecutive_large_err = 0 # è¿ç»å¤§è¯¯å·®è®¡æ° |
| | | self.large_err_threshold = p.get('large_err_threshold', 0.5) # 大误差éå¼ |
| | | self.max_large_err_count = 10 # æå¤§è¿ç»å¤§è¯¯å·®æ¬¡æ° |
| | | |
| | | # (æ¥å¿æä»¶å°å¨ä¸é¢ç»ä¸å建) |
| | | |
| | | # state (most recent readings) |
| | | self.pos = None # (x,y) |
| | | self.heading = None # rad |
| | | self.vel = (0.0, 0.0) # vx, vy in m/s (ground frame) |
| | | self.imu_accel = None |
| | | self.imu_gyro = None |
| | | |
| | | # control stages |
| | | self.STAGE_GOTO_START = 'goto_start' # ç§»å¨å°èµ·ç¹é¶æ®µ |
| | | self.STAGE_FOLLOW_PATH = 'follow_path' # è·éè·¯å¾é¶æ®µ |
| | | self.stage = self.STAGE_GOTO_START |
| | | |
| | | # èµ·ç¹æ¥è¿åæ° - æ¶ç´§ä»¥ç¡®ä¿çæ£å°è¾¾èµ·ç¹ |
| | | self.start_tolerance = p.get('start_tolerance', 0.12) # ä½ç½®å®¹å·®ï¼æ¶ç´§å°12cmï¼ |
| | | self.start_heading_tolerance = p.get('start_heading_tolerance', 0.01) # èªå容差 (~14Â°ï¼æ¶ç´§ï¼ |
| | | self.start_approach_dist = p.get('start_approach_dist', 1.5) # åéè·ç¦» |
| | | self.start_slow_speed = p.get('start_slow_speed', 0.25) # æ¥è¿é度 |
| | | self.start_min_speed = p.get('start_min_speed', 0.25) # æé«æå°é度 |
| | | # æ°å¢ï¼å°èµ·ç¹æ¶å
转åååè¿çéå¼ï¼è¥ä¸èµ·ç¹æ¹åè¯¯å·®å¤§äºæ¤éå¼ï¼åå
转åï¼ |
| | | self.start_turn_first_heading = p.get('start_turn_first_heading', pi/50) # ~15度 |
| | | # è·¯å¾è·éé¶æ®µçæå°åè¿é度ï¼é²æ¢æ å°/缩æ¾å¯¼è´é度å½é¶ï¼ |
| | | self.min_follow_speed = p.get('min_follow_speed', 0.08) |
| | | # Recovery 模å¼åæ° |
| | | self.in_recovery = False |
| | | self.recovery_dist = p.get('recovery_dist', 2.0) |
| | | self.recovery_exit_dist = p.get('recovery_exit_dist', 0.8) |
| | | self.recovery_speed = p.get('recovery_speed', 0.55) |
| | | |
| | | # progress - åªä½¿ç¨ç¹ç´¢å¼ï¼ä¸ä½¿ç¨segment |
| | | self.current_target_idx = 0 |
| | | self.nearest_path_idx = 0 # æè¿è·¯å¾ç¹ç´¢å¼ |
| | | self.finished = False |
| | | # goto_start åç¶æï¼'rotate'|'move'|'align'ï¼ä¿è¯é¡ºåºæ§è¡ä¸ä¸åå¤åæ¢ |
| | | self.goto_substage = 'rotate' |
| | | |
| | | # æç¹è½¬ååæ° |
| | | self.waypoint_approach_mode = False # æ¯å¦å¨æ¥è¿æç¹æ¨¡å¼ |
| | | self.waypoint_turn_mode = False # æ¯å¦å¨æç¹è½¬åæ¨¡å¼ |
| | | self.waypoint_turn_target_idx = None # å½å转åçæç¹å¨åå§è·¯å¾ä¸çç´¢å¼ |
| | | self.waypoint_turn_tolerance = p.get('waypoint_turn_tolerance', 0.12) # å°è¾¾æç¹çè·ç¦»å®¹å·®(12cm) |
| | | self.waypoint_approach_dist = p.get('waypoint_approach_dist', 0.8) # å¼å§åéæ¥è¿æç¹çè·ç¦»(80cm) |
| | | self.waypoint_turn_heading_tol = p.get('waypoint_turn_heading_tol', 0.1) # 转å宿çèªå容差(~6度) |
| | | self.waypoint_turned = set() # è®°å½å·²ç»è½¬åè¿çæç¹ï¼é¿å
éå¤è½¬å |
| | | # å建æ¥å¿æä»¶ |
| | | self.log_file = open('controller_log.csv', 'w') |
| | | # ç®åæ¥å¿æ ¼å¼ï¼ç§»é¤segmentç¸å
³ |
| | | self.log_file.write('t,x,y,heading,target_x,target_y,target_dist,target_idx,xte,heading_err,forward,turn,speed,stage,status\n') |
| | | |
| | | def update_gps(self, pos_xy, heading, vel_xy=None, timestamp=None): |
| | | """Update GPS-like measurements at ~10Hz. |
| | | |
| | | pos_xy: (x,y) in meters |
| | | heading: radians |
| | | vel_xy: (vx, vy) in m/s in global frame (optional) |
| | | """ |
| | | self.pos = tuple(pos_xy) |
| | | self.heading = float(heading) |
| | | if vel_xy is not None: |
| | | self.vel = tuple(vel_xy) |
| | | |
| | | def update_imu(self, accel3, gyro3, timestamp=None): |
| | | """Update IMU (100Hz). accel3 and gyro3 are 3-element tuples.""" |
| | | self.imu_accel = tuple(accel3) |
| | | self.imu_gyro = tuple(gyro3) |
| | | |
| | | def _calc_xte(self, p1, p2, pos): |
| | | """Calculate cross track error (perpendicular distance to line segment)""" |
| | | x, y = pos |
| | | x1, y1 = p1 |
| | | x2, y2 = p2 |
| | | |
| | | # Convert to local coordinates |
| | | dx = x2 - x1 |
| | | dy = y2 - y1 |
| | | l2 = dx*dx + dy*dy |
| | | |
| | | if l2 == 0: # points are identical |
| | | return hypot(x - x1, y - y1) |
| | | |
| | | # Project point onto line |
| | | t = ((x - x1)*dx + (y - y1)*dy) / l2 |
| | | |
| | | if t < 0: # before start |
| | | return hypot(x - x1, y - y1) |
| | | elif t > 1: # after end |
| | | return hypot(x - x2, y - y2) |
| | | else: # projection point lies on segment |
| | | px = x1 + t*dx |
| | | py = y1 + t*dy |
| | | xte = hypot(x - px, y - py) |
| | | # Determine sign based on which side of the line the point is on |
| | | sign = 1 if (x - px)*dy - (y - py)*dx > 0 else -1 |
| | | return sign * xte |
| | | |
| | | def _is_waypoint_turn_needed(self, idx): |
| | | """ |
| | | æ£æµè·¯å¾ç¹idxæ¯å¦æ¯æç¹ï¼éè¦åå°è½¬åï¼ |
| | | |
| | | æç¹å®ä¹ï¼ååä¸¤æ®µè·¯å¾æ¹åååè¶
è¿45度 |
| | | """ |
| | | if idx <= 0 or idx >= len(self.path) - 1: |
| | | return False |
| | | |
| | | # å䏿®µæ¹å |
| | | p_prev = self.path[idx - 1] |
| | | p_curr = self.path[idx] |
| | | p_next = self.path[idx + 1] |
| | | |
| | | dx1 = p_curr[0] - p_prev[0] |
| | | dy1 = p_curr[1] - p_prev[1] |
| | | dx2 = p_next[0] - p_curr[0] |
| | | dy2 = p_next[1] - p_curr[1] |
| | | |
| | | # 计ç®ä¸¤æ®µæ¹å |
| | | heading1 = atan2(dy1, dx1) |
| | | heading2 = atan2(dy2, dx2) |
| | | |
| | | # æ¹åååè§åº¦ |
| | | angle_diff = abs(wrap_angle(heading2 - heading1)) |
| | | |
| | | # 妿æ¹åååè¶
è¿45度ï¼è®¤ä¸ºæ¯æç¹ |
| | | return angle_diff > pi / 4 |
| | | |
| | | def _find_nearest_original_waypoint(self): |
| | | """ |
| | | å¨åå§è·¯å¾ä¸æ¾å°ææ¥è¿å½åä½ç½®çæç¹ |
| | | |
| | | Returns: (waypoint_idx, distance) æ (None, None) |
| | | """ |
| | | if self.pos is None: |
| | | return None, None |
| | | |
| | | x, y = self.pos |
| | | nearest_waypoint_idx = None |
| | | nearest_dist = float('inf') |
| | | |
| | | # éååå§è·¯å¾çæææç¹ |
| | | for i in range(1, len(self.original_path) - 1): |
| | | # æ£æ¥æ¯å¦æ¯æç¹ |
| | | p_prev = self.original_path[i - 1] |
| | | p_curr = self.original_path[i] |
| | | p_next = self.original_path[i + 1] |
| | | |
| | | dx1 = p_curr[0] - p_prev[0] |
| | | dy1 = p_curr[1] - p_prev[1] |
| | | dx2 = p_next[0] - p_curr[0] |
| | | dy2 = p_next[1] - p_curr[1] |
| | | |
| | | heading1 = atan2(dy1, dx1) |
| | | heading2 = atan2(dy2, dx2) |
| | | angle_diff = abs(wrap_angle(heading2 - heading1)) |
| | | |
| | | # å¦ææ¯æç¹ï¼æ¹ååå>45åº¦ï¼ |
| | | if angle_diff > pi / 4: |
| | | dist = hypot(p_curr[0] - x, p_curr[1] - y) |
| | | if dist < nearest_dist: |
| | | nearest_dist = dist |
| | | nearest_waypoint_idx = i |
| | | |
| | | return nearest_waypoint_idx, nearest_dist |
| | | |
| | | def _get_target_heading_at_waypoint(self, idx): |
| | | """è·ååå§è·¯å¾ç¹idxå¤çç®æ èªåï¼ä¸ä¸æ®µçæ¹åï¼""" |
| | | if idx >= len(self.original_path) - 1: |
| | | # æåä¸ä¸ªç¹ï¼è¿åå䏿®µæ¹å |
| | | if idx > 0: |
| | | dx = self.original_path[idx][0] - self.original_path[idx-1][0] |
| | | dy = self.original_path[idx][1] - self.original_path[idx-1][1] |
| | | return atan2(dy, dx) |
| | | return 0.0 |
| | | |
| | | # è¿åå°ä¸ä¸ä¸ªç¹çæ¹å |
| | | dx = self.original_path[idx + 1][0] - self.original_path[idx][0] |
| | | dy = self.original_path[idx + 1][1] - self.original_path[idx][1] |
| | | return atan2(dy, dx) |
| | | |
| | | def _find_lookahead_point(self, lookahead_dist): |
| | | """ |
| | | ç®åçPure Pursuitåè§ç¹æ¥æ¾ - åªä½¿ç¨ç¹ç´¢å¼ |
| | | |
| | | ä»nearest_path_idxå¼å§ååæç´¢ï¼æ¾å°ç¬¬ä¸ä¸ªè·ç¦»>=lookahead_distçç¹ |
| | | |
| | | Returns: (lookahead_xy, curvature, lookahead_idx, nearest_idx) |
| | | """ |
| | | if self.pos is None: |
| | | return self.path[0], 0.0, 0, 0 |
| | | |
| | | x, y = self.pos |
| | | |
| | | # 1. æ¾æè¿ç¹ - å¦æç¦»è·¯å¾å¾è¿,使ç¨å
¨å±æç´¢;å¦å使ç¨çªå£æç´¢ |
| | | # å
æ£æ¥å½ånearest_path_idxçè·ç¦» |
| | | px, py = self.path[self.nearest_path_idx] |
| | | current_nearest_dist = hypot(px - x, py - y) |
| | | |
| | | # 妿è·ç¦»>1.0m,使ç¨å
¨å±æç´¢ |
| | | if current_nearest_dist > 1.0: |
| | | search_start = 0 |
| | | search_end = len(self.path) |
| | | else: |
| | | # 使ç¨çªå£æç´¢ |
| | | search_start = max(0, self.nearest_path_idx - 5) |
| | | search_end = min(len(self.path), self.nearest_path_idx + 30) |
| | | |
| | | nearest_idx = self.nearest_path_idx |
| | | nearest_dist = float('inf') |
| | | |
| | | for i in range(search_start, search_end): |
| | | px, py = self.path[i] |
| | | d = hypot(px - x, py - y) |
| | | if d < nearest_dist: |
| | | nearest_dist = d |
| | | nearest_idx = i |
| | | |
| | | self.nearest_path_idx = nearest_idx # æ´æ°æè¿ç¹ç´¢å¼ |
| | | |
| | | # 2. ä»nearest_idxå¼å§ååæç´¢lookaheadç¹ |
| | | lookahead_idx = nearest_idx |
| | | for i in range(nearest_idx, len(self.path)): |
| | | px, py = self.path[i] |
| | | d = hypot(px - x, py - y) |
| | | if d >= lookahead_dist: |
| | | lookahead_idx = i |
| | | break |
| | | else: |
| | | # 没æ¾å°ï¼ä½¿ç¨æåä¸ä¸ªç¹ |
| | | lookahead_idx = len(self.path) - 1 |
| | | |
| | | # 3. 估计æ²çï¼åºäºnearest_idxéè¿çç¹ï¼ |
| | | curvature = 0.0 |
| | | if 1 <= nearest_idx < len(self.path) - 1: |
| | | p1 = self.path[nearest_idx - 1] |
| | | p2 = self.path[nearest_idx] |
| | | p3 = self.path[nearest_idx + 1] |
| | | |
| | | v1 = (p2[0] - p1[0], p2[1] - p1[1]) |
| | | v2 = (p3[0] - p2[0], p3[1] - p2[1]) |
| | | |
| | | l1 = hypot(v1[0], v1[1]) |
| | | l2 = hypot(v2[0], v2[1]) |
| | | |
| | | if l1 > 1e-6 and l2 > 1e-6: |
| | | dot = v1[0]*v2[0] + v1[1]*v2[1] |
| | | cos_angle = max(-1.0, min(1.0, dot / (l1 * l2))) |
| | | angle_change = abs(pi - acos(cos_angle)) |
| | | curvature = angle_change / max(l1, 1e-6) |
| | | |
| | | return self.path[lookahead_idx], curvature, lookahead_idx, nearest_idx |
| | | |
| | | def _find_nearest_original_waypoint(self): |
| | | """ |
| | | å¨åå§è·¯å¾ä¸æ¥æ¾ä¸ä¸ä¸ªæªå°è¾¾çæç¹ |
| | | æç
§è·¯å¾é¡ºåºæ¥æ¾ï¼ä¸è·³è¿ |
| | | |
| | | Returns: (waypoint_idx, distance) æ (None, inf) å¦ææ²¡ææ¾å° |
| | | """ |
| | | if not hasattr(self, 'original_path') or len(self.original_path) < 3: |
| | | return None, float('inf') |
| | | |
| | | x, y = self.pos |
| | | |
| | | # éååå§è·¯å¾ä¸çæç¹ï¼è·³è¿èµ·ç¹åç»ç¹ï¼ |
| | | for i in range(1, len(self.original_path) - 1): |
| | | # 妿è¿ä¸ªæç¹å·²ç»è½¬åè¿ï¼è·³è¿ |
| | | if i in self.waypoint_turned: |
| | | continue |
| | | |
| | | # æ£æ¥æ¹åæ¯å¦æ¹å |
| | | prev_pt = self.original_path[i-1] |
| | | curr_pt = self.original_path[i] |
| | | next_pt = self.original_path[i+1] |
| | | |
| | | # 计ç®ååæ¹ååé |
| | | dx1 = curr_pt[0] - prev_pt[0] |
| | | dy1 = curr_pt[1] - prev_pt[1] |
| | | dx2 = next_pt[0] - curr_pt[0] |
| | | dy2 = next_pt[1] - curr_pt[1] |
| | | |
| | | # 计ç®è§åº¦ |
| | | angle1 = atan2(dy1, dx1) |
| | | angle2 = atan2(dy2, dx2) |
| | | angle_change = abs(wrap_angle(angle2 - angle1)) |
| | | |
| | | # 妿è§åº¦åå大äº45度ï¼è®¤ä¸ºæ¯æç¹ |
| | | if angle_change > pi / 4: |
| | | # æ¾å°ç¬¬ä¸ä¸ªæªå®æçæç¹ï¼ç´æ¥è¿å |
| | | dist = hypot(curr_pt[0] - x, curr_pt[1] - y) |
| | | return i, dist |
| | | |
| | | return None, float('inf') |
| | | |
| | | def _get_target_heading_at_waypoint(self, waypoint_idx): |
| | | """ |
| | | è·åæç¹å¤çç®æ èªåï¼ä¸ä¸æ®µè·¯å¾çæ¹åï¼ |
| | | |
| | | Args: |
| | | waypoint_idx: æç¹å¨åå§è·¯å¾ä¸çç´¢å¼ |
| | | |
| | | Returns: ç®æ èªåï¼å¼§åº¦ï¼ |
| | | """ |
| | | if waypoint_idx is None: |
| | | return 0.0 # é»è®¤åä¸ |
| | | |
| | | # å¦ææ¯æåä¸ä¸ªç¹ï¼ä½¿ç¨å䏿®µçæ¹åï¼ä¿æä¸åï¼ |
| | | if waypoint_idx >= len(self.original_path) - 1: |
| | | if waypoint_idx > 0: |
| | | prev_pt = self.original_path[waypoint_idx - 1] |
| | | curr_pt = self.original_path[waypoint_idx] |
| | | return atan2(curr_pt[1] - prev_pt[1], curr_pt[0] - prev_pt[0]) |
| | | else: |
| | | return 0.0 # åªæä¸ä¸ªç¹ï¼é»è®¤åä¸ |
| | | |
| | | curr_pt = self.original_path[waypoint_idx] |
| | | next_pt = self.original_path[waypoint_idx + 1] |
| | | |
| | | dx = next_pt[0] - curr_pt[0] |
| | | dy = next_pt[1] - curr_pt[1] |
| | | |
| | | return atan2(dy, dx) |
| | | |
| | | def _compute_xte_simple(self, pos): |
| | | """ |
| | | ç®åçæ¨ªåè¯¯å·®è®¡ç® - åºäºæè¿ç¹ |
| | | |
| | | Returns: (xte, proj_x, proj_y) |
| | | """ |
| | | x, y = pos |
| | | nearest_idx = self.nearest_path_idx |
| | | |
| | | # 计ç®å°æè¿ç¹çåé |
| | | px, py = self.path[nearest_idx] |
| | | |
| | | # 妿æä¸ä¸ä¸ªç¹ï¼è®¡ç®å°è·¯å¾æ®µç横å误差 |
| | | if nearest_idx < len(self.path) - 1: |
| | | nx, ny = self.path[nearest_idx + 1] |
| | | |
| | | # 段æ¹ååé |
| | | dx = nx - px |
| | | dy = ny - py |
| | | seg_len2 = dx*dx + dy*dy |
| | | |
| | | if seg_len2 > 1e-9: |
| | | # æå½±å°æ®µä¸ |
| | | t = ((x - px)*dx + (y - py)*dy) / seg_len2 |
| | | t = max(0.0, min(1.0, t)) # éå¶å¨[0,1] |
| | | |
| | | proj_x = px + t * dx |
| | | proj_y = py + t * dy |
| | | |
| | | # 横å误差ï¼å¸¦ç¬¦å·ï¼ |
| | | xte_vec_x = x - proj_x |
| | | xte_vec_y = y - proj_y |
| | | xte_mag = hypot(xte_vec_x, xte_vec_y) |
| | | |
| | | # 使ç¨åä¹å¤æå·¦å³ |
| | | cross = xte_vec_x * dy - xte_vec_y * dx |
| | | xte = xte_mag if cross > 0 else -xte_mag |
| | | |
| | | return xte, proj_x, proj_y |
| | | |
| | | # å¦åç´æ¥ç¨å°æè¿ç¹çè·ç¦» |
| | | xte = hypot(x - px, y - py) |
| | | return xte, px, py |
| | | |
| | | def compute_control(self, timestamp=None): |
| | | """Compute forward and turn outputs mapped to [-100,100]. |
| | | |
| | | Returns dict with keys: forward(int), turn(int), info(dict) |
| | | """ |
| | | if self.finished: |
| | | return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} |
| | | |
| | | if self.pos is None or self.heading is None: |
| | | return {'forward': 0, 'turn': 0, 'info': {'status': 'no_state'}} |
| | | |
| | | # ensure timestamp is numeric for logging/formatting |
| | | if timestamp is None: |
| | | timestamp = time.time() |
| | | |
| | | # è®¡ç®æ¶é´å¢é |
| | | if self.last_time is None: |
| | | dt = 0.014 # 约74Hz |
| | | else: |
| | | dt = timestamp - self.last_time if timestamp else 0.014 |
| | | self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0) |
| | | |
| | | x, y = self.pos |
| | | |
| | | # å¤çç§»å¨å°èµ·ç¹é¶æ®µï¼ä½¿ç¨æä¹
åç¶æç¡®ä¿é¡ºåºæ§è¡ï¼rotate -> move -> align |
| | | if self.stage == self.STAGE_GOTO_START: |
| | | start_point = self.path[0] |
| | | dx = start_point[0] - x |
| | | dy = start_point[1] - y |
| | | dist_to_start = hypot(dx, dy) |
| | | |
| | | # ç®æ æåæåèµ·ç¹ |
| | | desired_heading_to_start = atan2(dy, dx) |
| | | heading_err_to_start = wrap_angle(desired_heading_to_start - self.heading) |
| | | |
| | | # åå§ååç¶æï¼ä¿å®ï¼ |
| | | if not hasattr(self, 'goto_substage') or self.goto_substage is None: |
| | | self.goto_substage = 'rotate' |
| | | |
| | | # åç¶æï¼rotate -> move -> align |
| | | if self.goto_substage == 'rotate': |
| | | # åå°è½¬åç´å°æå误差足å¤å°ï¼ç§»é¤è·ç¦»éåºæ¡ä»¶ï¼ç¡®ä¿å
è½¬å¥½ï¼ |
| | | if abs(heading_err_to_start) <= self.start_turn_first_heading: |
| | | self.goto_substage = 'move' |
| | | else: |
| | | yawrate_cmd = self.k_heading * heading_err_to_start * 1.3 # ç¨å¾®æé«è½¬åé度 |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | forward_signal = 0 |
| | | # target is start_point for goto_start substage |
| | | target_x, target_y = start_point |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | target_idx = 0 |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' |
| | | f'0.000,{heading_err_to_start:.3f},{forward_signal},{turn_signal},0.000,{self.stage},rotate_to_start\n') |
| | | self.log_file.flush() |
| | | return {'forward': forward_signal, 'turn': turn_signal, 'info': { |
| | | 'status': 'rotate_to_start', 'stage': self.stage, 'heading_err': heading_err_to_start, |
| | | 'dist_to_start': dist_to_start |
| | | }} |
| | | |
| | | if self.goto_substage == 'move': |
| | | # å¨ç§»å¨åç¶ææé´ï¼ä¸ä¼åå° rotate åç¶æï¼ç´å°æ¾å¼åæ¢ |
| | | desired_heading = desired_heading_to_start |
| | | heading_err = wrap_angle(desired_heading - self.heading) |
| | | |
| | | # å¦æå·²ç»æ¥è¿èµ·ç¹åè¿å
¥ align åç¶æ |
| | | if dist_to_start < self.start_tolerance: |
| | | self.goto_substage = 'align' |
| | | else: |
| | | # 常è§ï¼é¢åèµ·ç¹å¹¶åè¿ |
| | | if dist_to_start < self.start_approach_dist: |
| | | dist_ratio = dist_to_start / self.start_approach_dist |
| | | speed_scale = 0.4 + 0.6 * dist_ratio # æé«æå°é度æ¯ä¾ |
| | | base_target_speed = self.start_slow_speed + (self.base_speed - self.start_slow_speed) * dist_ratio |
| | | else: |
| | | speed_scale = 1.0 |
| | | base_target_speed = min(self.base_speed * 0.8, 0.5) # é嶿大é度é¿å
è¿å² |
| | | |
| | | # èªåè¯¯å·®ç¼©æ¾æ´å®½æ¾ |
| | | heading_ratio = abs(heading_err) / pi |
| | | err_scale = 0.5 + 0.5 * (1.0 - heading_ratio) # æ´å®½æ¾ |
| | | target_speed = max(self.start_min_speed * 1.5, base_target_speed * speed_scale * err_scale) |
| | | target_speed = max(target_speed, self.start_min_speed * 1.2) # ç¡®ä¿æå°é度 |
| | | |
| | | yawrate_cmd = self.k_heading * heading_err |
| | | forward_signal = int(round((target_speed / self.max_forward_mps) * 100)) |
| | | forward_signal = max(-100, min(100, forward_signal)) |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | |
| | | target_x, target_y = start_point |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | target_idx = 0 |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' |
| | | f'0.000,{heading_err:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},moving_to_start\n') |
| | | self.log_file.flush() |
| | | |
| | | return {'forward': forward_signal, 'turn': turn_signal, 'info': { |
| | | 'status': 'moving_to_start', 'stage': self.stage, 'dist_to_start': dist_to_start, |
| | | 'heading_err': heading_err |
| | | }} |
| | | |
| | | # align åç¶æï¼å°è¾¾èµ·ç¹åè¿è¡åå°æåä¿®æ£ï¼å®æå忢å°è·è¸ª |
| | | if self.goto_substage == 'align': |
| | | if len(self.path) > 1: |
| | | path_dx = self.path[1][0] - self.path[0][0] |
| | | path_dy = self.path[1][1] - self.path[0][1] |
| | | desired_start_heading = atan2(path_dy, path_dx) |
| | | else: |
| | | desired_start_heading = 0.0 |
| | | heading_err_final = wrap_angle(desired_start_heading - self.heading) |
| | | # 妿æç»æå坹齿é常æ¥è¿ä½ç½®ï¼å忢å°è·è¸ª |
| | | # åªæåæ¶æ»¡è¶³ä½ç½®ä¸èªåæ¡ä»¶æç®çæ£å°è¾¾èµ·ç¹ï¼é¿å
âåè¿+æåå·®âæå忢 |
| | | if (dist_to_start < self.start_tolerance) and (abs(heading_err_final) < self.start_heading_tolerance): |
| | | self.stage = self.STAGE_FOLLOW_PATH |
| | | self.goto_substage = None |
| | | # ä¿æ current_target_idx = 0ï¼è®© pure pursuit èªç¶éæ©åè§ç¹ |
| | | self.current_target_idx = 0 |
| | | # éç½®é度为è¾å°å¼ï¼é¿å
lookahead è¿å¤§ |
| | | self.current_speed = self.start_min_speed |
| | | # å°å¹
åè¿ä»¥ç¡®ä¿ä¸å¡å¨èµ·ç¹ |
| | | f_sig = int(round((self.start_min_speed / self.max_forward_mps) * 100)) |
| | | target_x, target_y = self.path[0] # 使ç¨èµ·ç¹ä½ä¸ºåå§ç®æ |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | target_idx = 0 |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' |
| | | f'0.000,{heading_err_final:.3f},{f_sig},0,{self.start_min_speed:.3f},{self.stage},reached_start\n') |
| | | self.log_file.flush() |
| | | return {'forward': f_sig, 'turn': 0, 'info': {'status': 'reached_start', 'stage': self.stage, 'advanced_idx': self.current_target_idx}} |
| | | else: |
| | | # å¨èµ·ç¹éè¿ä½æåä¸å¯¹ï¼è¿è¡åå°ä¿®æ£ï¼åæ¢åè¿ï¼ |
| | | sign = 1 if heading_err_final >= 0 else -1 |
| | | yawrate_cmd = sign * min(self.max_yawrate * 0.6, max(0.05, abs(heading_err_final) * 0.8)) |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | target_x, target_y = start_point |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | target_idx = 0 |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{target_idx},' |
| | | f'0.000,{heading_err_final:.3f},0,{turn_signal},{0.000:.3f},{self.stage},align_at_start\n') |
| | | self.log_file.flush() |
| | | return {'forward': 0, 'turn': turn_signal, 'info': {'status': 'align_at_start', 'heading_err': heading_err_final}} |
| | | |
| | | # è®¡ç®æ¶é´å¢é |
| | | if self.last_time is None: |
| | | dt = 0.014 # 约74Hz |
| | | else: |
| | | dt = timestamp - self.last_time if timestamp else 0.014 |
| | | self.last_time = timestamp if timestamp else (self.last_time + dt if self.last_time else 0.0) |
| | | |
| | | # è®¡ç®æ¨ªå误差ï¼ä½¿ç¨ç®åæ¹æ³ï¼ |
| | | x, y = self.pos |
| | | xte, proj_x, proj_y = self._compute_xte_simple(self.pos) |
| | | |
| | | # é¢å
计ç®å°ç»ç¹çè·ç¦» |
| | | final_dist = hypot(self.path[-1][0] - x, self.path[-1][1] - y) |
| | | |
| | | # ========== æç¹æ¥è¿åè½¬åæ¨¡å¼æ£æµ ========== |
| | | # æ£æ¥æ¯å¦éè¦è¿å
¥æç¹æ¥è¿æè½¬åæ¨¡å¼ |
| | | if not self.waypoint_approach_mode and not self.waypoint_turn_mode: |
| | | # å¨åå§è·¯å¾ä¸æ¥æ¾æè¿çæç¹ |
| | | waypoint_idx, waypoint_dist = self._find_nearest_original_waypoint() |
| | | |
| | | # 妿æç¹å¨æ¥è¿è·ç¦»å
ï¼2mï¼ä¸æªè½¬åè¿ï¼è¿å
¥æ¥è¿æ¨¡å¼ |
| | | # 注æï¼å¢å¤§æ£æµè·ç¦»ï¼ç¡®ä¿å¨çè¾¹ä¹è½æåè¿å
¥æ¥è¿æ¨¡å¼ |
| | | if (waypoint_idx is not None and |
| | | waypoint_dist < 2.0 and |
| | | waypoint_idx not in self.waypoint_turned): |
| | | self.waypoint_approach_mode = True |
| | | self.waypoint_turn_target_idx = waypoint_idx # è®°å½å½åæ¥è¿çæç¹ |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{self.original_path[waypoint_idx][0]:.3f},' |
| | | f'{self.original_path[waypoint_idx][1]:.3f},' |
| | | f'{waypoint_dist:.3f},{waypoint_idx},' |
| | | f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_waypoint_approach\n') |
| | | self.log_file.flush() |
| | | |
| | | # ========== æç¹æ¥è¿æ¨¡å¼ï¼ç²¾ç¡®æ¥è¿æç¹ä½ç½® ========== |
| | | if self.waypoint_approach_mode: |
| | | waypoint_x, waypoint_y = self.original_path[self.waypoint_turn_target_idx] |
| | | dx = waypoint_x - x |
| | | dy = waypoint_y - y |
| | | waypoint_dist = hypot(dx, dy) |
| | | |
| | | # æ£æ¥æ¯å¦å°è¾¾æç¹ï¼12cmå®¹å·®ï¼ |
| | | if waypoint_dist < self.waypoint_turn_tolerance: |
| | | # å°è¾¾æç¹ï¼åæ¢å°è½¬åæ¨¡å¼ |
| | | self.waypoint_approach_mode = False |
| | | self.waypoint_turn_mode = True |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{waypoint_x:.3f},{waypoint_y:.3f},' |
| | | f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},' |
| | | f'{xte:.3f},0.000,0,0,0.000,{self.stage},reached_waypoint\n') |
| | | self.log_file.flush() |
| | | else: |
| | | # ç´æ¥æåæç¹åè¿ï¼å¿½ç¥è·¯å¾ï¼ç´æ¥ç®æ ç¹å¯¼èªï¼ |
| | | desired_heading_to_waypoint = atan2(dy, dx) |
| | | heading_err_to_waypoint = wrap_angle(desired_heading_to_waypoint - self.heading) |
| | | |
| | | # é度æ§å¶ï¼æ¥è¿æ¶éæ¸åé |
| | | if waypoint_dist < 0.5: # 50cmå
åé |
| | | speed_ratio = waypoint_dist / 0.5 |
| | | target_speed = self.start_min_speed * (0.4 + 0.6 * speed_ratio) |
| | | else: |
| | | target_speed = self.base_speed * 0.7 |
| | | |
| | | # èªå误差ä¹ä¼éä½é度 |
| | | heading_ratio = abs(heading_err_to_waypoint) / pi |
| | | err_scale = 0.5 + 0.5 * (1.0 - heading_ratio) |
| | | target_speed = target_speed * err_scale |
| | | target_speed = max(self.start_min_speed * 0.5, target_speed) |
| | | |
| | | # è½¬åæ§å¶ |
| | | yawrate_cmd = self.k_heading * heading_err_to_waypoint * 1.3 # ç¨å¾®æé«è½¬åå¢ç |
| | | |
| | | forward_signal = int(round((target_speed / self.max_forward_mps) * 100)) |
| | | forward_signal = max(-100, min(100, forward_signal)) |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{waypoint_x:.3f},{waypoint_y:.3f},' |
| | | f'{waypoint_dist:.3f},{self.waypoint_turn_target_idx},' |
| | | f'{xte:.3f},{heading_err_to_waypoint:.3f},{forward_signal},{turn_signal},{target_speed:.3f},{self.stage},approaching_waypoint\n') |
| | | self.log_file.flush() |
| | | |
| | | return {'forward': forward_signal, 'turn': turn_signal, 'info': { |
| | | 'status': 'approaching_waypoint', |
| | | 'waypoint_dist': waypoint_dist, |
| | | 'heading_err': heading_err_to_waypoint, |
| | | 'target_speed': target_speed |
| | | }} |
| | | |
| | | # ========== æç¹è½¬å模å¼ï¼åå°è½¬å ========== |
| | | if self.waypoint_turn_mode: |
| | | # è·åç®æ èªåï¼ä¸ä¸æ®µçæ¹åï¼ |
| | | target_heading = self._get_target_heading_at_waypoint(self.waypoint_turn_target_idx) |
| | | heading_err = wrap_angle(target_heading - self.heading) |
| | | |
| | | # æ£æ¥æ¯å¦è½¬å宿 |
| | | if abs(heading_err) < self.waypoint_turn_heading_tol: |
| | | # 转å宿ï¼éåºè½¬åæ¨¡å¼ |
| | | self.waypoint_turned.add(self.waypoint_turn_target_idx) # è®°å½å·²è½¬å |
| | | self.waypoint_turn_mode = False |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},' |
| | | f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},' |
| | | f'0.000,{self.waypoint_turn_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},0,0,0.000,{self.stage},exit_waypoint_turn\n') |
| | | self.log_file.flush() |
| | | else: |
| | | # åå°è½¬åï¼forward=0, turnåºäºèªå误差 |
| | | yawrate_cmd = self.k_heading * heading_err * 1.2 # ç¨å¾®æé«è½¬åå¢ç |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{self.original_path[self.waypoint_turn_target_idx][0]:.3f},' |
| | | f'{self.original_path[self.waypoint_turn_target_idx][1]:.3f},' |
| | | f'0.000,{self.waypoint_turn_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},0,{turn_signal},0.000,{self.stage},waypoint_turning\n') |
| | | self.log_file.flush() |
| | | |
| | | return {'forward': 0, 'turn': turn_signal, 'info': { |
| | | 'status': 'waypoint_turning', |
| | | 'heading_err': heading_err, |
| | | 'target_heading': target_heading |
| | | }} |
| | | |
| | | # 卿åè§è·ç¦»ï¼åºäºå½åéåº¦åæ¨ªå误差 |
| | | actual_speed = abs(self.current_speed) |
| | | xte_mag = abs(xte) |
| | | |
| | | # lookahead = lookahead_min + speed_component - xte_penalty |
| | | base_lookahead = self.lookahead_min + self.lookahead_time * actual_speed - 0.3 * min(xte_mag, 0.5) |
| | | lookahead = max(self.lookahead_min, min(self.lookahead_max, base_lookahead)) |
| | | |
| | | # æ¢å¤æ¨¡å¼å¤å® |
| | | dist_to_path = abs(xte) |
| | | if not self.in_recovery and dist_to_path > self.recovery_dist: |
| | | self.in_recovery = True |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},' |
| | | f'{xte:.3f},0.000,0,0,0.000,{self.stage},enter_recovery\n') |
| | | self.log_file.flush() |
| | | |
| | | if self.in_recovery and dist_to_path < self.recovery_exit_dist: |
| | | self.in_recovery = False |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{proj_x:.3f},{proj_y:.3f},{dist_to_path:.3f},{self.nearest_path_idx},' |
| | | f'{xte:.3f},0.000,0,0,0.000,{self.stage},exit_recovery\n') |
| | | self.log_file.flush() |
| | | |
| | | # ========== æ å Pure Pursuit æ§å¶ ========== |
| | | # éæ©åè§ç¹ |
| | | (lx, ly), path_curvature, lookahead_idx, nearest_idx = self._find_lookahead_point(lookahead) |
| | | self.current_target_idx = lookahead_idx |
| | | |
| | | # 计ç®åè§ç¹æ¹å |
| | | path_heading = atan2(ly - y, lx - x) |
| | | |
| | | # 横åè¯¯å·®ä¿®æ£ |
| | | xte_normalized = max(-1.0, min(1.0, xte / self.max_xte)) |
| | | xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale |
| | | |
| | | # ææèªå = åè§æ¹å + xteä¿®æ£ |
| | | desired_heading = wrap_angle(path_heading + xte_correction) |
| | | heading_err = wrap_angle(desired_heading - self.heading) |
| | | |
| | | # 计ç®èªå误差çååç |
| | | heading_err_d = (heading_err - self.last_heading_err) / dt if dt > 0 else 0 |
| | | self.last_heading_err = heading_err |
| | | |
| | | # 积å项计ç®ï¼å¸¦é²é¥±åï¼ |
| | | self.heading_err_sum += heading_err * dt |
| | | self.xte_sum += xte * dt |
| | | self.heading_err_sum = max(-self.max_sum, min(self.max_sum, self.heading_err_sum)) |
| | | self.xte_sum = max(-self.max_sum, min(self.max_sum, self.xte_sum)) |
| | | |
| | | # PIDæ§å¶ |
| | | yawrate_cmd = (self.k_heading * heading_err + |
| | | self.k_heading_d * heading_err_d + |
| | | self.k_heading_i * self.heading_err_sum + |
| | | self.k_xte_i * self.xte_sum) |
| | | |
| | | # é度æ§å¶ï¼åºäºæ²çãèªåè¯¯å·®ãæ¨ªå误差 |
| | | # 1) æ²çç¼©æ¾ |
| | | curv_scale = 1.0 / (1.0 + abs(path_curvature) * self.curv_gain_scale) |
| | | |
| | | # 2) èªåè¯¯å·®ç¼©æ¾ |
| | | heading_err_abs = abs(heading_err) |
| | | err_scale = 1.0 / (1.0 + heading_err_abs * self.heading_err_gain_scale) |
| | | |
| | | # 3) 横åè¯¯å·®ç¼©æ¾ |
| | | xte_abs = abs(xte) |
| | | xte_scale = 1.0 / (1.0 + xte_abs * self.xte_gain_scale) |
| | | |
| | | # æ··å缩æ¾ï¼å ä½å¹³å + æå°å¼ |
| | | geom_scale = (curv_scale * err_scale * xte_scale) ** (1.0/3.0) |
| | | min_scale = min(curv_scale, err_scale, xte_scale) |
| | | blend_scale = 0.8 * geom_scale + 0.2 * min_scale |
| | | |
| | | target_speed = self.base_speed * blend_scale |
| | | |
| | | # 滤波åéå¹
|
| | | self.yawrate_filter = (self.yawrate_alpha * yawrate_cmd + |
| | | (1-self.yawrate_alpha) * self.yawrate_filter) |
| | | max_yawrate = self.max_yawrate |
| | | yawrate_cmd = max(-max_yawrate, min(max_yawrate, self.yawrate_filter)) |
| | | |
| | | # æ¢å¤æ¨¡å¼è¦çï¼ä¼å
级æ´é«ï¼ |
| | | if self.in_recovery: |
| | | # æ¢å¤æ¨¡å¼ï¼æåæå½±ç¹ |
| | | path_heading = atan2(proj_y - y, proj_x - x) |
| | | xte_normalized = max(-1.0, min(1.0, xte / self.max_xte)) |
| | | xte_correction = -self.k_xte * xte_normalized * self.xte_gain_scale |
| | | desired_heading = wrap_angle(path_heading + xte_correction) |
| | | heading_err = wrap_angle(desired_heading - self.heading) |
| | | yawrate_cmd = self.k_heading * heading_err |
| | | target_speed = self.recovery_speed |
| | | # åºäºè·ç¦»ç»ç¹çè·ç¦»è°æ´é度 |
| | | in_slow_zone = final_dist < self.slow_zone |
| | | in_final_approach = final_dist < self.final_approach_dist |
| | | |
| | | # é误æ¢å¤é»è¾ |
| | | if abs(xte) > self.large_err_threshold * 1.5 or abs(heading_err) > pi/2.5: |
| | | self.consecutive_large_err += 1 |
| | | if self.consecutive_large_err > self.max_large_err_count: |
| | | # éç½®å°å®å
¨ç¶æ |
| | | self.consecutive_large_err = 0 |
| | | self.heading_err_sum = 0 |
| | | self.xte_sum = 0 |
| | | else: |
| | | self.consecutive_large_err = 0 |
| | | |
| | | # æ¥è¿ç»ç¹æ¶è°æ´é度 |
| | | if not self.in_recovery: |
| | | if in_slow_zone: |
| | | dist_scale = max(0.3, min(1.0, final_dist / self.slow_zone)) |
| | | if in_final_approach: |
| | | dist_scale *= max(0.4, final_dist / self.final_approach_dist) |
| | | target_speed *= dist_scale |
| | | |
| | | # å¨ç»ç¹éè¿å¢å 精度 |
| | | if in_final_approach: |
| | | target_speed *= 0.6 |
| | | if abs(xte) > 0.15 or abs(heading_err) > pi/10: |
| | | target_speed *= 0.7 |
| | | |
| | | # å¼ºå¶æå°é度ï¼é¿å
å®å
¨åæ»ï¼ |
| | | if self.stage == self.STAGE_FOLLOW_PATH and not self.in_recovery: |
| | | dynamic_min = self.min_follow_speed if in_slow_zone else self.min_follow_speed * 1.5 |
| | | if abs(heading_err) < 3.0 and target_speed < dynamic_min: |
| | | target_x, target_y = lx, ly |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | self.log_file.write(f'{timestamp:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},0,0,' |
| | | f'{dynamic_min:.3f},{self.stage},enforce_min_speed\n') |
| | | self.log_file.flush() |
| | | target_speed = dynamic_min |
| | | |
| | | # 软å¯å¨ï¼éå¶å é度 |
| | | accel = self.accel * (0.5 if in_slow_zone else 1.0) |
| | | speed_change = target_speed - self.current_speed |
| | | max_change = accel * dt |
| | | self.current_speed += max(-max_change, min(max_change, speed_change)) |
| | | |
| | | # å车æ§å¶ |
| | | if self.stage == self.STAGE_GOTO_START: |
| | | if abs(heading_err) > 2.9: |
| | | self.current_speed = -self.max_reverse_mps * 0.5 |
| | | else: |
| | | if abs(heading_err) > 2.9 and not self.in_recovery: |
| | | self.current_speed = -self.max_reverse_mps * 0.4 |
| | | if self.current_speed < 0 and abs(heading_err) < 2.5: |
| | | self.current_speed = 0.0 |
| | | |
| | | # ç»ç¹æ£æµï¼ç®åï¼åªåºäºè·ç¦»åç¹ç´¢å¼ï¼ |
| | | if final_dist < self.goal_tolerance: |
| | | self.finished = True |
| | | return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} |
| | | |
| | | # æ©å±å®ææ¡ä»¶ï¼æ¥è¿ç»ç¹ä¸è¯¯å·®å° |
| | | if (final_dist < self.goal_tolerance * 1.5 and |
| | | self.current_target_idx >= self.n - 2 and |
| | | abs(heading_err) < 0.3 and abs(xte) < 0.2): |
| | | self.finished = True |
| | | return {'forward': 0, 'turn': 0, 'info': {'status': 'finished'}} |
| | | |
| | | # map to signals [-100,100] |
| | | # è®°å½æ å°åçé度信æ¯ä»¥ä¾¿è¯æ |
| | | log_t = timestamp if timestamp is not None else time.time() |
| | | pre_map_speed = self.current_speed |
| | | # è¯ææ¥å¿ï¼ç®åï¼ |
| | | target_x, target_y = lx, ly |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | pre_map_forward = int(round((pre_map_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 |
| | | pre_map_forward = max(-100, min(100, pre_map_forward)) |
| | | pre_map_turn = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0 |
| | | pre_map_turn = max(-100, min(100, pre_map_turn)) |
| | | self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},{pre_map_forward},{pre_map_turn},{pre_map_speed:.3f},' |
| | | f'{self.stage},pre_map\n') |
| | | self.log_file.flush() |
| | | |
| | | forward_signal = int(round((self.current_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 |
| | | forward_signal = max(-100, min(100, forward_signal)) |
| | | |
| | | # å¼ºå¶æå°åè¿ä¿¡å·ï¼é²æ¢å®å
¨åæ¢ï¼ |
| | | if forward_signal == 0 and not self.finished and abs(heading_err) < 3.0: |
| | | forced_signal = int(round((self.min_follow_speed / self.max_forward_mps) * 100)) if self.max_forward_mps != 0 else 0 |
| | | if forced_signal != 0: |
| | | target_x, target_y = lx, ly |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | self.log_file.write(f'{log_t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},{forced_signal},{int(round((yawrate_cmd / self.max_yawrate) * 100))},' |
| | | f'{self.min_follow_speed:.3f},{self.stage},force_min_forward\n') |
| | | self.log_file.flush() |
| | | forward_signal = forced_signal |
| | | self.current_speed = self.min_follow_speed |
| | | |
| | | turn_signal = int(round((yawrate_cmd / self.max_yawrate) * 100)) if self.max_yawrate != 0 else 0 |
| | | turn_signal = max(-100, min(100, turn_signal)) |
| | | |
| | | status = 'finished' if self.finished else 'running' |
| | | info = { |
| | | 'status': status, |
| | | 'pos': (x, y), |
| | | 'heading': self.heading, |
| | | 'desired_heading': desired_heading, |
| | | 'heading_err': heading_err, |
| | | 'current_speed': self.current_speed, |
| | | 'yawrate_cmd': yawrate_cmd, |
| | | 'target_xy': (lx, ly), |
| | | 'path_curvature': path_curvature, |
| | | 'final_dist': final_dist, |
| | | } |
| | | |
| | | # ç®åçæ¥å¿è®°å½ |
| | | t = log_t |
| | | target_x, target_y = lx, ly |
| | | target_dist = hypot(target_x - x, target_y - y) |
| | | self.log_file.write(f'{t:.3f},{x:.3f},{y:.3f},{self.heading:.3f},' |
| | | f'{target_x:.3f},{target_y:.3f},{target_dist:.3f},{self.current_target_idx},' |
| | | f'{xte:.3f},{heading_err:.3f},{forward_signal},{turn_signal},{self.current_speed:.3f},' |
| | | f'{self.stage},{status}\n') |
| | | self.log_file.flush() |
| | | |
| | | return {'forward': forward_signal, 'turn': turn_signal, 'info': info} |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | # quick manual test |
| | | import json |
| | | path = [[0,0],[5,0],[10,0]] |
| | | c = MowerController(path) |
| | | c.update_gps((0, -1.0), 0.0) |
| | | for i in range(20): |
| | | out = c.compute_control() |
| | | print(i, out) |
| | | # fake forward motion |
| | | f = out['forward'] / 100.0 * c.max_forward_mps |
| | | hd = c.heading + (out['turn'] / 100.0) * c.max_yawrate * 0.1 |
| | | px = c.pos[0] + f * cos(hd) * 0.1 |
| | | py = c.pos[1] + f * sin(hd) * 0.1 |
| | | c.update_gps((px,py), hd) |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | Simple path planner to generate approach path from mower start position to planned path start. |
| | | """ |
| | | import numpy as np |
| | | from math import hypot, atan2, cos, sin, pi |
| | | |
| | | |
| | | def plan_approach_path(start_pos, start_heading, target_pos, target_heading, method='line'): |
| | | """ |
| | | Generate approach path from mower start to planned path start. |
| | | |
| | | Args: |
| | | start_pos: (x, y) mower starting position |
| | | start_heading: mower starting heading in radians |
| | | target_pos: (x, y) planned path starting position |
| | | target_heading: planned path starting heading in radians |
| | | method: 'line' | 'dubins' | 'smooth' |
| | | |
| | | Returns: |
| | | List of (x, y) waypoints forming the approach path |
| | | """ |
| | | sx, sy = start_pos |
| | | tx, ty = target_pos |
| | | dist = hypot(tx - sx, ty - sy) |
| | | |
| | | if dist < 0.05: # Already at start, no approach needed |
| | | return [start_pos, target_pos] |
| | | |
| | | if method == 'line': |
| | | # Simple straight line with waypoints every 0.5m |
| | | num_points = max(3, int(dist / 0.5) + 1) |
| | | path = [] |
| | | for i in range(num_points): |
| | | t = i / (num_points - 1) |
| | | x = sx + t * (tx - sx) |
| | | y = sy + t * (ty - sy) |
| | | path.append((x, y)) |
| | | return path |
| | | |
| | | elif method == 'smooth': |
| | | # Smooth curve considering start and target headings |
| | | # Use cubic bezier curve |
| | | # Control points based on headings |
| | | ctrl_dist = min(dist * 0.4, 2.0) # Control point distance |
| | | |
| | | # Start control point along start heading |
| | | c1x = sx + ctrl_dist * cos(start_heading) |
| | | c1y = sy + ctrl_dist * sin(start_heading) |
| | | |
| | | # End control point along target heading (backwards) |
| | | c2x = tx - ctrl_dist * cos(target_heading) |
| | | c2y = ty - ctrl_dist * sin(target_heading) |
| | | |
| | | # Generate bezier curve points |
| | | num_points = max(5, int(dist / 0.3) + 1) |
| | | path = [] |
| | | for i in range(num_points): |
| | | t = i / (num_points - 1) |
| | | # Cubic bezier formula |
| | | x = (1-t)**3 * sx + 3*(1-t)**2*t * c1x + 3*(1-t)*t**2 * c2x + t**3 * tx |
| | | y = (1-t)**3 * sy + 3*(1-t)**2*t * c1y + 3*(1-t)*t**2 * c2y + t**3 * ty |
| | | path.append((x, y)) |
| | | return path |
| | | |
| | | else: |
| | | raise ValueError(f"Unknown method: {method}") |
| | | |
| | | |
| | | def compute_path_heading(path, idx=0): |
| | | """Compute heading of path at given index.""" |
| | | if idx >= len(path) - 1: |
| | | idx = len(path) - 2 |
| | | if idx < 0: |
| | | idx = 0 |
| | | |
| | | dx = path[idx + 1][0] - path[idx][0] |
| | | dy = path[idx + 1][1] - path[idx][1] |
| | | return atan2(dy, dx) |
| | | |
| | | |
| | | def combine_paths(approach_path, work_path): |
| | | """ |
| | | Combine approach path and work path into single path with metadata. |
| | | |
| | | Returns: |
| | | combined_path: List of (x, y) waypoints |
| | | approach_end_idx: Index where approach path ends (work path starts) |
| | | """ |
| | | # Remove duplicate point at junction if exists |
| | | if len(approach_path) > 0 and len(work_path) > 0: |
| | | if hypot(approach_path[-1][0] - work_path[0][0], |
| | | approach_path[-1][1] - work_path[0][1]) < 0.01: |
| | | combined = approach_path[:-1] + work_path |
| | | approach_end_idx = len(approach_path) - 1 |
| | | else: |
| | | combined = approach_path + work_path |
| | | approach_end_idx = len(approach_path) |
| | | else: |
| | | combined = approach_path + work_path |
| | | approach_end_idx = len(approach_path) |
| | | |
| | | return combined, approach_end_idx |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | # Test path planning |
| | | import json |
| | | import matplotlib.pyplot as plt |
| | | |
| | | # Load planned path |
| | | with open('example_path.json', 'r') as f: |
| | | work_path = json.load(f) |
| | | |
| | | # Mower starts 2m away from path start |
| | | mower_start = (work_path[0][0] - 1.5, work_path[0][1] - 1.5) |
| | | mower_heading = pi / 4 # 45 degrees |
| | | |
| | | # Plan path heading |
| | | plan_heading = compute_path_heading(work_path, 0) |
| | | |
| | | # Generate approach paths with different methods |
| | | approach_line = plan_approach_path(mower_start, mower_heading, |
| | | work_path[0], plan_heading, 'line') |
| | | approach_smooth = plan_approach_path(mower_start, mower_heading, |
| | | work_path[0], plan_heading, 'smooth') |
| | | |
| | | # Plot comparison |
| | | fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
| | | |
| | | # Line method |
| | | ax1.plot([p[0] for p in work_path], [p[1] for p in work_path], |
| | | 'b-', linewidth=2, label='Work Path') |
| | | ax1.plot([p[0] for p in approach_line], [p[1] for p in approach_line], |
| | | 'r--', linewidth=2, marker='o', markersize=4, label='Approach (Line)') |
| | | ax1.plot(mower_start[0], mower_start[1], 'go', markersize=10, label='Mower Start') |
| | | ax1.arrow(mower_start[0], mower_start[1], 0.3*cos(mower_heading), 0.3*sin(mower_heading), |
| | | head_width=0.15, head_length=0.1, fc='g', ec='g') |
| | | ax1.set_xlabel('X (m)') |
| | | ax1.set_ylabel('Y (m)') |
| | | ax1.set_title('Line Approach') |
| | | ax1.legend() |
| | | ax1.grid(True) |
| | | ax1.axis('equal') |
| | | |
| | | # Smooth method |
| | | ax2.plot([p[0] for p in work_path], [p[1] for p in work_path], |
| | | 'b-', linewidth=2, label='Work Path') |
| | | ax2.plot([p[0] for p in approach_smooth], [p[1] for p in approach_smooth], |
| | | 'r--', linewidth=2, marker='o', markersize=4, label='Approach (Smooth)') |
| | | ax2.plot(mower_start[0], mower_start[1], 'go', markersize=10, label='Mower Start') |
| | | ax2.arrow(mower_start[0], mower_start[1], 0.3*cos(mower_heading), 0.3*sin(mower_heading), |
| | | head_width=0.15, head_length=0.1, fc='g', ec='g') |
| | | ax2.set_xlabel('X (m)') |
| | | ax2.set_ylabel('Y (m)') |
| | | ax2.set_title('Smooth Approach') |
| | | ax2.legend() |
| | | ax2.grid(True) |
| | | ax2.axis('equal') |
| | | |
| | | plt.tight_layout() |
| | | plt.savefig('approach_path_test.png', dpi=150) |
| | | print("Saved approach_path_test.png") |
| | | print(f"Line approach: {len(approach_line)} points") |
| | | print(f"Smooth approach: {len(approach_smooth)} points") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | Offline simulation and path visualization |
| | | - Run complete trajectory with approach path |
| | | - Plot final path and control signals |
| | | - Save detailed logs for analysis |
| | | """ |
| | | import json |
| | | import time |
| | | from math import cos, sin, pi, hypot, atan2 |
| | | import numpy as np |
| | | import matplotlib |
| | | # use non-interactive backend to avoid blocking GUI (run headless) |
| | | matplotlib.use('Agg') |
| | | import matplotlib.pyplot as plt |
| | | from mower_controller import MowerController, wrap_angle |
| | | from path_planner import plan_approach_path, compute_path_heading, combine_paths |
| | | |
| | | |
| | | def simulate_full_path(work_path, mower_start_pos, mower_start_heading, params=None, |
| | | approach_method='smooth'): |
| | | """ |
| | | Run complete simulation with approach path and return state history. |
| | | |
| | | Args: |
| | | work_path: Planned work path [(x, y), ...] |
| | | mower_start_pos: (x, y) mower starting position |
| | | mower_start_heading: mower starting heading in radians |
| | | params: Controller parameters dict |
| | | approach_method: 'line' or 'smooth' |
| | | """ |
| | | # Generate approach path from mower start to work path start |
| | | work_path_heading = compute_path_heading(work_path, 0) |
| | | approach_path = plan_approach_path(mower_start_pos, mower_start_heading, |
| | | work_path[0], work_path_heading, |
| | | method=approach_method) |
| | | |
| | | # Combine paths |
| | | combined_path, approach_end_idx = combine_paths(approach_path, work_path) |
| | | |
| | | print(f"Generated approach path: {len(approach_path)} points") |
| | | print(f"Work path: {len(work_path)} points") |
| | | print(f"Combined path: {len(combined_path)} points (approach ends at idx {approach_end_idx})") |
| | | |
| | | # Initialize controller with combined path |
| | | ctrl = MowerController(combined_path, params=params) |
| | | |
| | | # Initial state |
| | | x, y = mower_start_pos |
| | | heading = mower_start_heading |
| | | t = 0.0 |
| | | dt = 1.0/74.0 # control period |
| | | |
| | | # History for plotting |
| | | history = { |
| | | 't': [0.0], |
| | | 'x': [x], |
| | | 'y': [y], |
| | | 'heading': [heading], |
| | | 'forward': [0], |
| | | 'turn': [0], |
| | | 'approach_end_idx': approach_end_idx, # Store for visualization |
| | | 'approach_path': approach_path, |
| | | 'work_path': work_path, |
| | | } |
| | | |
| | | finished = False |
| | | while not finished and t < 300.0: # timeout after 300s |
| | | # Update controller |
| | | ctrl.update_gps((x, y), heading) |
| | | ctrl.update_imu((0,0,0), (0,0,0)) |
| | | |
| | | # Get control signals |
| | | out = ctrl.compute_control(t) |
| | | f_sig = out['forward'] |
| | | t_sig = out['turn'] |
| | | |
| | | # Convert to physical quantities |
| | | f_mps = (f_sig / 100.0) * ctrl.max_forward_mps |
| | | yawrate = (t_sig / 100.0) * ctrl.max_yawrate |
| | | |
| | | # Update state |
| | | heading = wrap_angle(heading + yawrate * dt) |
| | | x = x + f_mps * cos(heading) * dt |
| | | y = y + f_mps * sin(heading) * dt |
| | | t += dt |
| | | |
| | | # Record |
| | | history['t'].append(t) |
| | | history['x'].append(x) |
| | | history['y'].append(y) |
| | | history['heading'].append(heading) |
| | | history['forward'].append(f_sig) |
| | | history['turn'].append(t_sig) |
| | | |
| | | # Check completion |
| | | if out['info'].get('status') == 'finished': |
| | | finished = True |
| | | print(f'Reached goal in {t:.1f}s') |
| | | |
| | | if not finished: |
| | | print('Timeout!') |
| | | |
| | | return history |
| | | |
| | | |
| | | def plot_results(history): |
| | | """Plot path following results with approach and work paths.""" |
| | | approach_path = history['approach_path'] |
| | | work_path = history['work_path'] |
| | | approach_end_idx = history['approach_end_idx'] |
| | | |
| | | fig = plt.figure(figsize=(15, 10)) |
| | | gs = fig.add_gridspec(3, 2) |
| | | |
| | | # Path plot (larger, spans two rows) |
| | | ax_path = fig.add_subplot(gs[0:2, 0]) |
| | | |
| | | # Plot approach path (dashed orange) |
| | | apx = [p[0] for p in approach_path] |
| | | apy = [p[1] for p in approach_path] |
| | | ax_path.plot(apx, apy, '--', color='orange', lw=2, label='Approach Path', alpha=0.7) |
| | | ax_path.scatter(apx, apy, c='orange', s=15, alpha=0.5) |
| | | |
| | | # Plot work path (solid black) |
| | | wpx = [p[0] for p in work_path] |
| | | wpy = [p[1] for p in work_path] |
| | | ax_path.plot(wpx, wpy, '-k', lw=2, label='Work Path') |
| | | ax_path.scatter(wpx, wpy, c='k', s=20) |
| | | |
| | | # Plot actual trajectory |
| | | ax_path.plot(history['x'], history['y'], '-r', lw=1.5, label='Actual', alpha=0.8) |
| | | |
| | | # Add markers |
| | | ax_path.plot(history['x'][0], history['y'][0], 'go', markersize=12, |
| | | label='Mower Start', zorder=10) |
| | | ax_path.plot(work_path[0][0], work_path[0][1], 'bs', markersize=10, |
| | | label='Work Path Start', zorder=10) |
| | | ax_path.plot(history['x'][-1], history['y'][-1], 'ro', markersize=10, |
| | | label='End', zorder=10) |
| | | |
| | | ax_path.set_aspect('equal') |
| | | ax_path.grid(True, alpha=0.3) |
| | | ax_path.set_xlabel('X (m)') |
| | | ax_path.set_ylabel('Y (m)') |
| | | ax_path.set_title('Path Following Results (Approach + Work)') |
| | | ax_path.legend(loc='best') |
| | | |
| | | # XY position vs time |
| | | ax_xy = fig.add_subplot(gs[0:2, 1]) |
| | | ax_xy.plot(history['t'], history['x'], '-b', label='X') |
| | | ax_xy.plot(history['t'], history['y'], '-r', label='Y') |
| | | ax_xy.set_xlabel('Time (s)') |
| | | ax_xy.set_ylabel('Position (m)') |
| | | ax_xy.grid(True) |
| | | ax_xy.legend() |
| | | |
| | | # Control signals |
| | | ax_ctrl = fig.add_subplot(gs[2, :]) |
| | | ax_ctrl.plot(history['t'], history['forward'], '-b', label='Forward') |
| | | ax_ctrl.plot(history['t'], history['turn'], '-r', label='Turn') |
| | | ax_ctrl.set_xlabel('Time (s)') |
| | | ax_ctrl.set_ylabel('Control Signal') |
| | | ax_ctrl.grid(True) |
| | | ax_ctrl.legend() |
| | | |
| | | plt.tight_layout() |
| | | plt.savefig('simulation_results.png', dpi=300, bbox_inches='tight') |
| | | plt.show() |
| | | |
| | | |
| | | def main(): |
| | | # Load planned work path |
| | | try: |
| | | with open('example_path.json', 'r') as f: |
| | | work_path = json.load(f) |
| | | except Exception as e: |
| | | print(f'Failed to load path: {e}') |
| | | return |
| | | |
| | | # Define mower starting position (offset from work path start) |
| | | mower_start_pos = (work_path[0][0] + 3.5, work_path[0][1] + 1.0) |
| | | mower_start_heading = pi / 4 # 45 degrees |
| | | |
| | | print(f"Mower starts at: {mower_start_pos}, heading: {mower_start_heading:.2f} rad") |
| | | print(f"Work path starts at: {work_path[0]}") |
| | | |
| | | # Run simulation with stable high-precision parameters |
| | | params = { |
| | | # === æ ¸å¿æ§å¶å¢çï¼ä¿®æ£åï¼ === |
| | | 'k_heading': 2.0, # èªåå¢ç |
| | | 'k_xte': 0.8, # 横å误差å¢çï¼2.0 â 0.8 (大å¹
éä½ä»¥é²æ¢è¿åº¦ä¿®æ£) |
| | | 'k_heading_d': 0.1, # èªåå¾®åå¢ç |
| | | 'k_heading_i': 0.015, # èªå积åå¢ç |
| | | 'k_xte_i': 0.02, # 横å积åå¢ç |
| | | |
| | | # === é度æ§å¶ === |
| | | 'base_speed': 0.8, # åºç¡é度ï¼0.6 â 0.8 |
| | | 'max_forward_mps': 1.2, # æå¤§é度ï¼0.95 â 1.2 |
| | | 'min_follow_speed': 0.40, # æå°é度ï¼0.30 â 0.40 |
| | | 'max_reverse_mps': 0.25, |
| | | 'max_yawrate': pi/2.5, # 转åéçï¼pi/3 â pi/2.5 (æ´å¿«è½¬å) |
| | | |
| | | # === åè§è·ç¦» === |
| | | 'lookahead_min': 0.35, # æå°åè§ |
| | | 'lookahead_max': 1.2, # æå¤§åè§ |
| | | 'lookahead_time': 2.0, # åè§æ¶é´ç³»æ° |
| | | |
| | | # === 误差容å¿åº¦ === |
| | | 'max_xte':1.5, # æå¤§æ¨ªå误差ï¼0.8 â 1.0 (å¢å¤§ä»¥åå°å½ä¸åæ¾å¤§) |
| | | 'goal_tolerance': 0.1, # ç®æ å®¹å¿ |
| | | 'large_err_threshold': 0.5, # 大误差éå¼ |
| | | |
| | | # === æ»¤æ³¢åæ° === |
| | | 'yawrate_alpha': 0.85, # è½¬åæ»¤æ³¢ |
| | | |
| | | # === é度缩æ¾å¢ç === |
| | | 'curv_gain_scale': 2.0, # æ²çç¼©æ¾ |
| | | 'heading_err_gain_scale': 1.5, # èªåè¯¯å·®ç¼©æ¾ |
| | | 'xte_gain_scale': 1.0, # 横å误差缩æ¾ï¼1.2 â 1.0 (ç§»é¤é¢å¤æ¾å¤§) |
| | | |
| | | # === ç»ç¹åéåºå === |
| | | 'slow_zone': 1.5, |
| | | 'final_approach_dist': 0.8, |
| | | |
| | | # === èµ·ç¹é¶æ®µ === |
| | | 'start_min_speed': 0.28, |
| | | 'start_slow_speed': 0.38, |
| | | |
| | | # === æ¢å¤æ¨¡å¼ === |
| | | 'recovery_dist': 2.5, |
| | | 'recovery_exit_dist': 0.6, |
| | | 'recovery_speed': 0.6, |
| | | } |
| | | |
| | | history = simulate_full_path(work_path, mower_start_pos, mower_start_heading, |
| | | params, approach_method='smooth') |
| | | plot_results(history) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | Simple image viewer for simulation results |
| | | """ |
| | | from PIL import Image |
| | | import matplotlib.pyplot as plt |
| | | |
| | | # Open the saved simulation image |
| | | img = Image.open('simulation_results.png') |
| | | |
| | | # Display |
| | | plt.figure(figsize=(16, 10)) |
| | | plt.imshow(img) |
| | | plt.axis('off') |
| | | plt.title('Simulation Results with Approach Path') |
| | | plt.tight_layout() |
| | | plt.savefig('view_results.png', dpi=150, bbox_inches='tight') |
| | | print("Saved view_results.png") |
| | | print(f"Image size: {img.size}") |
| | | print("\nâ Approach path (orange dashed) + Work path (black) are now visualized") |
| | | print("â Mower starts from offset position and follows approach path to work path start") |
| | | print("\nâ ï¸ Note: Trajectory still has issues after ~40s due to segment progression bug") |
| | | print(" This is the existing Pure Pursuit restoration issue, not related to approach path") |
| ¶Ô±ÈÐÂÎļþ |
| | |
| | | """ |
| | | viz.py |
| | | |
| | | 读å sim_log.csvï¼ç± test.py çæï¼å¹¶ç¨ matplotlib å¨ç»æ¾ç¤ºï¼ |
| | | - 主å¾ï¼è·¯å¾ç¹ï¼`example_path.json`ï¼åå²èæºå½åä½ç½®/èªåç®å¤´ |
| | | - 䏿¹ä¸¤ä¸ªåå¾ï¼forward_sig ä¸ turn_sig éæ¶é´çæ²çº¿ï¼å½å帧ç¨ç«çº¿æ 示 |
| | | |
| | | è¿è¡ï¼ |
| | | python viz.py |
| | | |
| | | å¦ææ²¡æ `sim_log.csv`ï¼èæ¬ä¼æç¤ºå
è¿è¡ `test.py` çææ°æ®ã |
| | | """ |
| | | import os |
| | | import json |
| | | import csv |
| | | import sys |
| | | from math import cos, sin |
| | | import numpy as np |
| | | |
| | | try: |
| | | import matplotlib.pyplot as plt |
| | | from matplotlib.animation import FuncAnimation |
| | | except Exception as e: |
| | | print('matplotlib required. Install with: pip install matplotlib') |
| | | raise |
| | | |
| | | |
| | | SIM_CSV = 'sim_log.csv' |
| | | PATH_FILE = 'example_path.json' |
| | | |
| | | |
| | | def load_sim(csv_path=SIM_CSV): |
| | | if not os.path.exists(csv_path): |
| | | print(f"Sim log '{csv_path}' not found. Run test.py first to generate it.") |
| | | sys.exit(1) |
| | | t, xs, ys, headings, forward_sig, turn_sig = [], [], [], [], [], [] |
| | | with open(csv_path, 'r', newline='') as f: |
| | | reader = csv.DictReader(f) |
| | | for row in reader: |
| | | t.append(float(row['t'])) |
| | | xs.append(float(row['x'])) |
| | | ys.append(float(row['y'])) |
| | | headings.append(float(row['heading'])) |
| | | forward_sig.append(int(row['forward_sig'])) |
| | | turn_sig.append(int(row['turn_sig'])) |
| | | return {'t': t, 'x': xs, 'y': ys, 'heading': headings, 'forward': forward_sig, 'turn': turn_sig} |
| | | |
| | | |
| | | def load_path(pth=PATH_FILE): |
| | | if not os.path.exists(pth): |
| | | return [] |
| | | with open(pth, 'r') as f: |
| | | return json.load(f) |
| | | |
| | | |
| | | def animate_sim(simdata, path): |
| | | t = simdata['t'] |
| | | x = simdata['x'] |
| | | y = simdata['y'] |
| | | h = simdata['heading'] |
| | | fwd = simdata['forward'] |
| | | trn = simdata['turn'] |
| | | |
| | | fig = plt.figure(figsize=(10, 8)) |
| | | gs = fig.add_gridspec(3, 1, height_ratios=[3, 1, 1]) |
| | | |
| | | ax_map = fig.add_subplot(gs[0]) |
| | | ax_f = fig.add_subplot(gs[1]) |
| | | ax_t = fig.add_subplot(gs[2]) |
| | | |
| | | # map plot |
| | | if path: |
| | | px = [p[0] for p in path] |
| | | py = [p[1] for p in path] |
| | | ax_map.plot(px, py, '-k', lw=1, label='planned path') |
| | | ax_map.scatter(px, py, c='k', s=10) |
| | | |
| | | line_pos, = ax_map.plot([], [], '-r', lw=2, label='actual') |
| | | point_pos, = ax_map.plot([], [], 'ro') |
| | | # heading arrow using quiver (initialize at first pose if available) |
| | | if len(x) > 0 and len(h) > 0: |
| | | quiv = ax_map.quiver([x[0]], [y[0]], [cos(h[0])], [sin(h[0])], angles='xy', scale_units='xy', scale=4, color='b') |
| | | else: |
| | | quiv = ax_map.quiver([0], [0], [1], [0], angles='xy', scale_units='xy', scale=4, color='b') |
| | | |
| | | ax_map.set_aspect('equal', 'box') |
| | | ax_map.set_title('Mower path and pose') |
| | | ax_map.legend() |
| | | |
| | | # forward subplot |
| | | ax_f.plot(t, fwd, color='gray', alpha=0.5) |
| | | f_line = ax_f.axvline(t[0], color='r') |
| | | ax_f.set_ylabel('forward_sig') |
| | | |
| | | # turn subplot |
| | | ax_t.plot(t, trn, color='gray', alpha=0.5) |
| | | t_line = ax_t.axvline(t[0], color='r') |
| | | ax_t.set_ylabel('turn_sig') |
| | | ax_t.set_xlabel('time (s)') |
| | | |
| | | # set map limits |
| | | margin = 1.0 |
| | | all_x = x + ( [p[0] for p in path] if path else [] ) |
| | | all_y = y + ( [p[1] for p in path] if path else [] ) |
| | | if all_x and all_y: |
| | | ax_map.set_xlim(min(all_x)-margin, max(all_x)+margin) |
| | | ax_map.set_ylim(min(all_y)-margin, max(all_y)+margin) |
| | | |
| | | def init(): |
| | | line_pos.set_data([], []) |
| | | point_pos.set_data([], []) |
| | | return line_pos, point_pos, quiv, f_line, t_line |
| | | |
| | | def update(i): |
| | | # i is frame index |
| | | xi = x[:i+1] |
| | | yi = y[:i+1] |
| | | line_pos.set_data(xi, yi) |
| | | point_pos.set_data([x[i]], [y[i]]) |
| | | |
| | | # update quiver in-place |
| | | hx = cos(h[i]) |
| | | hy = sin(h[i]) |
| | | # set new offset and vector |
| | | quiv.set_offsets(np.array([[x[i], y[i]]])) |
| | | quiv.set_UVC(np.array([hx]), np.array([hy])) |
| | | |
| | | # update vertical lines |
| | | f_line.set_xdata(t[i]) |
| | | t_line.set_xdata(t[i]) |
| | | |
| | | return line_pos, point_pos, quiv, f_line, t_line |
| | | |
| | | interval_ms = max(10, int(1000.0 / 74.0)) |
| | | ani = FuncAnimation(fig, update, frames=len(t), init_func=init, blit=False, interval=interval_ms) |
| | | plt.tight_layout() |
| | | plt.show() |
| | | |
| | | |
| | | def main(): |
| | | sim = load_sim() |
| | | path = load_path() |
| | | animate_sim(sim, path) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |