yincheng.zhong
3 天以前 30303d366d1a0d857357c90bed876686f2d1e603
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
实时闭环控制系统
接收 GPS/IMU 数据 → 运动控制算法 → 下发控制命令
"""
 
import serial
import struct
import time
import json
import math
from typing import Optional
from gps_imu_receiver import GPSIMUReceiver, GPSData, IMUData
from mower_controller import MowerController, wrap_angle
 
 
class ControlCommandSender:
    """控制命令发送器 - 发送转向和油门控制信号给STM32"""
    
    # 协议常量
    FRAME_HEADER1 = 0xAA
    FRAME_HEADER2 = 0x55
    TYPE_CONTROL = 0x10      # 控制命令类型
    FRAME_FOOTER1 = 0x0D
    FRAME_FOOTER2 = 0x0A
    
    def __init__(self, serial_port: serial.Serial):
        """
        初始化控制命令发送器
        
        Args:
            serial_port: 已打开的串口对象
        """
        self.serial = serial_port
        self.send_count = 0
        
    def send_control_command(self, steering_pwm: int, throttle_pwm: int) -> bool:
        """
        发送控制命令到STM32
        
        协议格式: AA 55 10 04 00 STEER(2) THROTTLE(2) CHECKSUM(2) 0D 0A
        
        Args:
            steering_pwm: 转向PWM值 (1000-2000us)
            throttle_pwm: 油门PWM值 (1000-2000us)
            
        Returns:
            发送成功返回True
        """
        try:
            # 限制PWM范围
            steering_pwm = max(1000, min(2000, steering_pwm))
            throttle_pwm = max(1000, min(2000, throttle_pwm))
            
            # 构建数据包
            # 帧头 (2B) + 类型 (1B) + 长度 (2B) + 数据 (4B) + 校验和 (2B) + 帧尾 (2B)
            data_len = 4  # 2字节转向 + 2字节油门
            
            payload = struct.pack('<HH', steering_pwm, throttle_pwm)
            
            # 构建完整帧用于计算校验和
            checksum_data = (
                bytes([self.TYPE_CONTROL]) +
                struct.pack('<H', data_len) +
                payload
            )
 
            # 计算16位累加和校验
            checksum = sum(checksum_data) & 0xFFFF
 
            # 完整数据包
            packet = (
                bytes([self.FRAME_HEADER1, self.FRAME_HEADER2]) +
                checksum_data +
                struct.pack('<H', checksum) +
                bytes([self.FRAME_FOOTER1, self.FRAME_FOOTER2])
            )
            
            # 发送
            self.serial.write(packet)
            self.serial.flush()
            
            self.send_count += 1
            return True
            
        except Exception as e:
            print(f"[ERROR] 控制命令发送失败: {e}")
            return False
    
    def get_stats(self):
        """获取发送统计"""
        return self.send_count
 
 
class RealtimeController:
    """实时控制器 - 整合GPS/IMU接收、控制算法和命令发送"""
    
    def __init__(self, port: str, baudrate: int = 921600):
        """
        初始化实时控制器
        
        Args:
            port: 串口设备名
            baudrate: 波特率
        """
        self.port = port
        self.baudrate = baudrate
        
        # GPS/IMU接收器
        self.receiver = GPSIMUReceiver(port, baudrate)
        
        # 控制命令发送器 (共用同一个串口)
        self.cmd_sender = None
        
        # 运动控制器
        self.mower_ctrl = None
        
        # 状态变量
        self.last_gps_time = 0
        self.last_imu_time = 0
        self.control_period = 1.0 / 74.0  # 74Hz控制频率
        self.last_control_time = 0
        
        # 数据缓存
        self.latest_gps: Optional[GPSData] = None
        self.latest_imu: Optional[IMUData] = None
        
        # 统计
        self.control_count = 0
        
    def load_path(self, path_file: str) -> bool:
        """
        加载路径文件并初始化控制器
        
        Args:
            path_file: JSON路径文件
            
        Returns:
            成功返回True
        """
        try:
            with open(path_file, 'r') as f:
                path_points = json.load(f)
            
            print(f"[INFO] 加载路径: {len(path_points)} 个点")
            
            # 使用sim_offline.py中的高精度参数
            params = {
                # 核心控制增益
                'k_heading': 2.0,
                'k_xte': 0.8,
                'k_heading_d': 0.1,
                'k_heading_i': 0.015,
                'k_xte_i': 0.02,
                
                # 速度控制
                'base_speed': 0.8,
                'max_forward_mps': 1.2,
                'min_follow_speed': 0.40,
                'max_reverse_mps': 0.25,
                'max_yawrate': math.pi/2.5,
                
                # 前视距离
                'lookahead_min': 0.35,
                'lookahead_max': 1.2,
                'lookahead_time': 2.0,
                
                # 误差容忍度
                'max_xte': 1.5,
                'goal_tolerance': 0.1,
                'large_err_threshold': 0.5,
                
                # 滤波参数
                'yawrate_alpha': 0.85,
                
                # 速度缩放增益
                'curv_gain_scale': 2.0,
                'heading_err_gain_scale': 1.5,
                'xte_gain_scale': 1.0,
                
                # 终点减速区域
                'slow_zone': 1.5,
                'final_approach_dist': 0.8,
            }
            
            self.mower_ctrl = MowerController(path_points, params=params)
            print("[INFO] 运动控制器初始化完成")
            return True
            
        except Exception as e:
            print(f"[ERROR] 加载路径失败: {e}")
            return False
    
    def connect(self) -> bool:
        """连接串口"""
        if not self.receiver.connect():
            return False
        
        # 创建控制命令发送器 (共用串口对象)
        self.cmd_sender = ControlCommandSender(self.receiver.serial)
        print("[INFO] 控制命令发送器就绪")
        return True
    
    def disconnect(self):
        """断开连接"""
        self.receiver.disconnect()
    
    def _convert_gps_to_local(self, gps: GPSData) -> tuple:
        """
        将GPS数据转换为本地坐标 (简化版,实际需要根据起点转换)
        
        Returns:
            (x, y, heading_rad, vx, vy)
        """
        # TODO: 实际应用中需要将经纬度转换为相对起点的XY坐标
        # 这里简化处理,假设GPS已提供相对坐标或使用UTM转换
        
        # 航向角转换为弧度 (GPS输出0-360°,北为0°顺时针)
        # 需要转换为数学坐标系 (东为0°逆时针)
        heading_rad = math.radians(90 - gps.heading_angle)
        heading_rad = wrap_angle(heading_rad)
        
        # 速度分量 (GPS提供东北天坐标系)
        vx = gps.east_velocity   # 东向速度 = X轴速度
        vy = gps.north_velocity  # 北向速度 = Y轴速度
        
        # 位置 (简化,实际需要坐标转换)
        x = gps.longitude * 111320.0 * math.cos(math.radians(gps.latitude))  # 近似米
        y = gps.latitude * 110540.0  # 近似米
        
        return (x, y, heading_rad, vx, vy)
    
    def _control_to_pwm(self, forward: int, turn: int) -> tuple:
        """
        将控制信号转换为PWM值
        
        Args:
            forward: 前进信号 (-100 到 100)
            turn: 转向信号 (-100 到 100, 负为右转, 正为左转)
            
        Returns:
            (steering_pwm, throttle_pwm) 单位us
        """
        # 油门: -100~100 映射到 1000~2000us, 中位1500us
        throttle_pwm = int(1500 + forward * 5.0)
        
        # 转向: -100~100 映射到 1000~2000us, 中位1500us
        # 注意: 正值左转(1000), 负值右转(2000)
        steering_pwm = int(1500 - turn * 5.0)
        
        return (steering_pwm, throttle_pwm)
    
    def run(self):
        """运行实时控制循环"""
        if not self.mower_ctrl:
            print("[ERROR] 请先加载路径文件")
            return
        
        print("[INFO] 开始实时控制... (按 Ctrl+C 停止)")
        print("[INFO] 等待GPS数据...")
        
        try:
            start_time = time.time()
            last_stats_time = start_time
            
            # 等待第一个有效的GPS数据
            gps_ready = False
            while not gps_ready:
                gps_data, imu_data = self.receiver.receive_packet()
                
                if gps_data:
                    if gps_data.position_quality > 0 and gps_data.satellite_count >= 4:
                        self.latest_gps = gps_data
                        gps_ready = True
                        print(f"[INFO] GPS就绪: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
                    else:
                        print(f"[WARN] GPS质量不足: 质量={gps_data.position_quality}, 卫星={gps_data.satellite_count}")
                
                if imu_data:
                    self.latest_imu = imu_data
            
            print("[INFO] 开始控制循环")
            
            while True:
                current_time = time.time()
                
                # 接收数据 (非阻塞式,快速轮询)
                gps_data, imu_data = self.receiver.receive_packet()
                
                if gps_data:
                    self.latest_gps = gps_data
                    self.last_gps_time = current_time
                
                if imu_data:
                    self.latest_imu = imu_data
                    self.last_imu_time = current_time
                
                # 控制周期检查 (74Hz)
                if current_time - self.last_control_time >= self.control_period:
                    if self.latest_gps:
                        # 转换GPS数据到本地坐标
                        x, y, heading, vx, vy = self._convert_gps_to_local(self.latest_gps)
                        
                        # 更新控制器状态
                        self.mower_ctrl.update_gps((x, y), heading, (vx, vy), current_time)
                        
                        if self.latest_imu:
                            accel = (self.latest_imu.accel_x, self.latest_imu.accel_y, self.latest_imu.accel_z)
                            gyro = (self.latest_imu.gyro_x, self.latest_imu.gyro_y, self.latest_imu.gyro_z)
                            self.mower_ctrl.update_imu(accel, gyro, current_time)
                        
                        # 计算控制信号
                        control_output = self.mower_ctrl.compute_control(current_time)
                        forward_signal = control_output['forward']  # -100~100
                        turn_signal = control_output['turn']        # -100~100
                        
                        # 转换为PWM并发送
                        steering_pwm, throttle_pwm = self._control_to_pwm(forward_signal, turn_signal)
                        self.cmd_sender.send_control_command(steering_pwm, throttle_pwm)
                        
                        self.control_count += 1
                        self.last_control_time = current_time
                        
                        # 打印控制信息 (降低频率)
                        if self.control_count % 74 == 0:  # 每秒打印一次
                            status = control_output['info'].get('status', 'running')
                            xte = control_output['info'].get('xte', 0)
                            heading_err_deg = math.degrees(control_output['info'].get('heading_err', 0))
                            
                            print(f"[CTRL] 状态: {status}, "
                                  f"前进: {forward_signal:+4d}, 转向: {turn_signal:+4d}, "
                                  f"PWM: S={steering_pwm}, T={throttle_pwm}, "
                                  f"横向误差: {xte:.2f}m, 航向误差: {heading_err_deg:+.1f}°")
                        
                        # 检查是否完成
                        if control_output['info'].get('status') == 'finished':
                            print("[INFO] 路径跟踪完成!")
                            break
                
                # 统计信息 (每10秒)
                if current_time - last_stats_time > 10.0:
                    print(f"\n========== 运行统计 ==========")
                    print(f"运行时间: {current_time - start_time:.1f}s")
                    print(f"控制次数: {self.control_count}")
                    print(f"GPS数据包: {self.receiver.gps_count}")
                    print(f"IMU数据包: {self.receiver.imu_count}")
                    print(f"命令发送: {self.cmd_sender.get_stats()}")
                    print(f"错误计数: {self.receiver.error_count}")
                    
                    if self.latest_gps:
                        print(f"GPS状态: 质量={self.latest_gps.position_quality}, "
                              f"卫星={self.latest_gps.satellite_count}, "
                              f"航向={self.latest_gps.heading_angle:.1f}°")
                    
                    print(f"==============================\n")
                    last_stats_time = current_time
        
        except KeyboardInterrupt:
            print("\n[INFO] 用户中断")
        
        finally:
            # 发送停止命令 (中位值)
            print("[INFO] 发送停止命令...")
            for _ in range(5):
                self.cmd_sender.send_control_command(1500, 1500)
                time.sleep(0.02)
            
            # 打印最终统计
            print("\n========== 最终统计 ==========")
            print(f"总控制次数: {self.control_count}")
            print(f"GPS数据包: {self.receiver.gps_count}")
            print(f"IMU数据包: {self.receiver.imu_count}")
            print(f"命令发送: {self.cmd_sender.get_stats()}")
            print(f"错误计数: {self.receiver.error_count}")
            print("==============================\n")
 
 
def main():
    """主函数"""
    # 配置
    PORT = "COM17"  # 根据实际情况修改
    BAUDRATE = 921600
    PATH_FILE = "example_path.json"  # 路径文件
    
    # 创建控制器
    controller = RealtimeController(PORT, BAUDRATE)
    
    # 加载路径
    if not controller.load_path(PATH_FILE):
        print("[ERROR] 路径加载失败,退出")
        return
    
    # 连接串口
    if not controller.connect():
        print("[ERROR] 串口连接失败,退出")
        return
    
    # 运行控制循环
    try:
        controller.run()
    finally:
        controller.disconnect()
 
 
if __name__ == "__main__":
    main()