#!/usr/bin/env python3
|
# -*- coding: utf-8 -*-
|
"""
|
车辆运动模型校准数据分析工具
|
解析$CAL格式日志,提取差速运动模型参数
|
"""
|
|
import argparse
|
import json
|
import re
|
import sys
|
from pathlib import Path
|
from typing import List, Dict, Tuple
|
import numpy as np
|
import matplotlib
|
import matplotlib.pyplot as plt
|
from scipy import optimize
|
from collections import defaultdict
|
|
# 可选:配置中文字体以避免警告
|
# 如果图表中文显示异常,可运行: python tools/fix_matplotlib_font.py 测试
|
try:
|
import platform
|
system = platform.system()
|
if system == 'Windows':
|
matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'SimSun']
|
elif system == 'Darwin':
|
matplotlib.rcParams['font.sans-serif'] = ['PingFang SC', 'Heiti SC']
|
else:
|
matplotlib.rcParams['font.sans-serif'] = ['WenQuanYi Micro Hei', 'Noto Sans CJK SC']
|
matplotlib.rcParams['axes.unicode_minus'] = False
|
except:
|
pass # 字体配置失败也不影响功能,只是可能有警告
|
|
# 添加项目路径
|
sys.path.append(str(Path(__file__).parent.parent))
|
from hitl.geo import geo_to_ecef, ecef_to_enu
|
|
|
class CalibrationData:
|
"""校准数据点"""
|
def __init__(self, line: str):
|
# $CAL,seq,time_ms,state,throttle_pwm,steering_pwm,lat,lon,alt,hdg,pitch,roll,gx,gy,gz,ax,ay,az
|
parts = line.strip().split(',')
|
if len(parts) < 18:
|
raise ValueError(f"Invalid CAL line: {line}")
|
|
self.seq = int(parts[1])
|
self.time_ms = int(parts[2])
|
self.state = parts[3]
|
self.throttle_pwm = int(parts[4])
|
self.steering_pwm = int(parts[5])
|
self.lat = float(parts[6])
|
self.lon = float(parts[7])
|
self.alt = float(parts[8])
|
self.hdg = float(parts[9])
|
self.pitch = float(parts[10])
|
self.roll = float(parts[11])
|
self.gx = float(parts[12]) / 1000.0 # 毫度/秒 -> 度/秒
|
self.gy = float(parts[13]) / 1000.0
|
self.gz = float(parts[14]) / 1000.0
|
self.ax = float(parts[15]) / 1000.0 # 毫g -> g
|
self.ay = float(parts[16]) / 1000.0
|
self.az = float(parts[17]) / 1000.0
|
|
# 待计算字段
|
self.enu_x = 0.0
|
self.enu_y = 0.0
|
self.enu_z = 0.0
|
self.velocity = 0.0 # 前向速度 m/s
|
self.yaw_rate = 0.0 # 角速度 rad/s
|
|
|
class CalibrationAnalyzer:
|
"""校准数据分析器"""
|
|
def __init__(self, log_file: str):
|
self.log_file = log_file
|
self.data: List[CalibrationData] = []
|
self.origin_lat = None
|
self.origin_lon = None
|
self.origin_alt = None
|
self.states_data: Dict[str, List[CalibrationData]] = defaultdict(list)
|
|
def parse_log(self):
|
"""解析日志文件"""
|
print(f"解析日志文件: {self.log_file}")
|
|
with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
for line in f:
|
if line.startswith('$CAL'):
|
try:
|
data = CalibrationData(line)
|
self.data.append(data)
|
except Exception as e:
|
print(f"警告: 解析行失败: {e}")
|
continue
|
|
if not self.data:
|
raise ValueError("未找到有效的$CAL数据")
|
|
print(f"成功解析 {len(self.data)} 条数据点")
|
|
# 使用第一个点作为ENU原点
|
self.origin_lat = self.data[0].lat
|
self.origin_lon = self.data[0].lon
|
self.origin_alt = self.data[0].alt
|
print(f"ENU原点: lat={self.origin_lat:.6f}, lon={self.origin_lon:.6f}, alt={self.origin_alt:.2f}")
|
|
def compute_enu_and_velocity(self):
|
"""计算ENU坐标和速度"""
|
print("计算ENU坐标和速度...")
|
|
origin_ecef = geo_to_ecef(self.origin_lat, self.origin_lon, self.origin_alt)
|
origin_lat_rad = np.deg2rad(self.origin_lat)
|
origin_lon_rad = np.deg2rad(self.origin_lon)
|
|
for i, data in enumerate(self.data):
|
# 计算ENU坐标
|
ecef = geo_to_ecef(data.lat, data.lon, data.alt)
|
dx = ecef[0] - origin_ecef[0]
|
dy = ecef[1] - origin_ecef[1]
|
dz = ecef[2] - origin_ecef[2]
|
enu = ecef_to_enu(dx, dy, dz, origin_lat_rad, origin_lon_rad)
|
data.enu_x, data.enu_y, data.enu_z = enu
|
|
# 计算速度(相邻点差分)
|
if i > 0:
|
dt = (data.time_ms - self.data[i-1].time_ms) / 1000.0 # 秒
|
if dt > 0:
|
dx = data.enu_x - self.data[i-1].enu_x
|
dy = data.enu_y - self.data[i-1].enu_y
|
data.velocity = np.sqrt(dx**2 + dy**2) / dt
|
|
# 角速度(从航向角差分)
|
dhdg = data.hdg - self.data[i-1].hdg
|
# 处理角度跳变
|
if dhdg > 180:
|
dhdg -= 360
|
elif dhdg < -180:
|
dhdg += 360
|
data.yaw_rate = np.deg2rad(dhdg) / dt
|
|
# 按状态分组
|
for data in self.data:
|
self.states_data[data.state].append(data)
|
|
print(f"数据分组完成,共 {len(self.states_data)} 个状态")
|
for state, dlist in self.states_data.items():
|
print(f" {state}: {len(dlist)} 个点")
|
|
def analyze_forward_model(self) -> Dict:
|
"""分析前向运动模型"""
|
print("\n=== 分析前向运动模型 ===")
|
|
results = {}
|
|
# 分析巡航阶段的PWM-速度关系
|
cruise_states = ['CRUISE_LOW', 'CRUISE_MID', 'CRUISE_HIGH']
|
pwm_velocity_pairs = []
|
|
for state in cruise_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
# 取后半段数据(稳态)
|
stable_data = dlist[len(dlist)//2:]
|
if stable_data:
|
avg_pwm = np.mean([d.throttle_pwm for d in stable_data])
|
avg_vel = np.mean([d.velocity for d in stable_data])
|
pwm_velocity_pairs.append((avg_pwm, avg_vel))
|
print(f"{state}: PWM={avg_pwm:.0f}, 速度={avg_vel:.2f} m/s")
|
|
# 拟合线性模型: v = k * (pwm - pwm_center) + bias
|
if len(pwm_velocity_pairs) >= 2:
|
pwms = np.array([p[0] for p in pwm_velocity_pairs])
|
vels = np.array([p[1] for p in pwm_velocity_pairs])
|
|
# 线性拟合
|
def linear_model(pwm, k, bias):
|
return k * (pwm - 1500) + bias
|
|
popt, _ = optimize.curve_fit(linear_model, pwms, vels)
|
k_forward, bias = popt
|
|
results['pwm_to_velocity'] = {
|
'model': 'linear',
|
'k': float(k_forward),
|
'bias': float(bias),
|
'formula': f'v = {k_forward:.6f} * (pwm - 1500) + {bias:.6f}',
|
'pwm_velocity_pairs': [(float(p), float(v)) for p, v in pwm_velocity_pairs]
|
}
|
print(f"PWM-速度模型: v = {k_forward:.6f} * (pwm - 1500) + {bias:.6f}")
|
|
# 分析加速度
|
accel_states = ['ACCEL_LOW', 'ACCEL_MID', 'ACCEL_HIGH']
|
accel_results = {}
|
|
for state in accel_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
if len(dlist) > 5:
|
velocities = [d.velocity for d in dlist]
|
times = [(d.time_ms - dlist[0].time_ms) / 1000.0 for d in dlist]
|
|
# 线性拟合速度-时间
|
if len(times) > 1:
|
coeffs = np.polyfit(times, velocities, 1)
|
accel = coeffs[0]
|
accel_results[state] = float(accel)
|
print(f"{state}: 加速度 = {accel:.3f} m/s²")
|
|
results['acceleration'] = accel_results
|
|
# 分析制动
|
if 'BRAKE' in self.states_data:
|
dlist = self.states_data['BRAKE']
|
if len(dlist) > 5:
|
velocities = [d.velocity for d in dlist]
|
times = [(d.time_ms - dlist[0].time_ms) / 1000.0 for d in dlist]
|
|
if len(times) > 1:
|
coeffs = np.polyfit(times, velocities, 1)
|
decel = coeffs[0]
|
results['deceleration'] = float(decel)
|
print(f"制动减速度: {decel:.3f} m/s²")
|
|
return results
|
|
def analyze_steering_model(self) -> Dict:
|
"""分析转向模型"""
|
print("\n=== 分析转向模型 ===")
|
|
results = {}
|
|
turn_states = ['TURN_LEFT_LIGHT', 'TURN_RIGHT_LIGHT', 'TURN_LEFT_HEAVY', 'TURN_RIGHT_HEAVY']
|
|
pwm_yawrate_pairs = []
|
|
for state in turn_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
# 取中间段数据(稳态转向)
|
start_idx = len(dlist) // 4
|
end_idx = 3 * len(dlist) // 4
|
stable_data = dlist[start_idx:end_idx]
|
|
if stable_data:
|
avg_steering_pwm = np.mean([d.steering_pwm for d in stable_data])
|
avg_yaw_rate = np.mean([d.yaw_rate for d in stable_data])
|
avg_velocity = np.mean([d.velocity for d in stable_data])
|
|
# 转向PWM偏移量(1500为中心)
|
steering_offset = avg_steering_pwm - 1500
|
|
pwm_yawrate_pairs.append((steering_offset, avg_yaw_rate, avg_velocity))
|
|
turning_radius = abs(avg_velocity / avg_yaw_rate) if abs(avg_yaw_rate) > 0.01 else float('inf')
|
print(f"{state}: 舵机PWM={avg_steering_pwm:.0f} ({steering_offset:+.0f}), "
|
f"角速度={avg_yaw_rate:.3f} rad/s, 速度={avg_velocity:.2f} m/s, "
|
f"转向半径={turning_radius:.2f} m")
|
|
# 拟合转向模型: ω = k_steering * steering_offset
|
if len(pwm_yawrate_pairs) >= 2:
|
offsets = np.array([p[0] for p in pwm_yawrate_pairs])
|
yaw_rates = np.array([p[1] for p in pwm_yawrate_pairs])
|
|
# 线性拟合(过原点)
|
k_steering = np.sum(offsets * yaw_rates) / np.sum(offsets**2)
|
|
results['pwm_to_yaw_rate'] = {
|
'model': 'linear',
|
'k': float(k_steering),
|
'formula': f'ω = {k_steering:.8f} * (steering_pwm - 1500)',
|
'data': [(float(o), float(y), float(v)) for o, y, v in pwm_yawrate_pairs]
|
}
|
print(f"转向模型: ω = {k_steering:.8f} * (steering_pwm - 1500)")
|
|
return results
|
|
def analyze_delays(self) -> Dict:
|
"""分析响应延迟"""
|
print("\n=== 分析响应延迟 ===")
|
|
results = {}
|
|
# 检测WARMUP到ACCEL_LOW的响应延迟
|
if 'WARMUP' in self.states_data and 'ACCEL_LOW' in self.states_data:
|
warmup_last = self.states_data['WARMUP'][-1]
|
accel_first = self.states_data['ACCEL_LOW'][0]
|
|
# 找到速度开始上升的点
|
threshold_velocity = 0.1 # m/s
|
for d in self.states_data['ACCEL_LOW']:
|
if d.velocity > threshold_velocity:
|
delay_ms = d.time_ms - accel_first.time_ms
|
results['throttle_response_delay_ms'] = float(delay_ms)
|
print(f"油门响应延迟: {delay_ms} ms")
|
break
|
|
return results
|
|
def generate_plots(self, output_dir: Path):
|
"""生成可视化图表"""
|
print("\n=== 生成可视化图表 ===")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
# 1. 速度-时间曲线
|
fig, ax = plt.subplots(figsize=(14, 6))
|
times = [(d.time_ms - self.data[0].time_ms) / 1000.0 for d in self.data]
|
velocities = [d.velocity for d in self.data]
|
throttles = [d.throttle_pwm for d in self.data]
|
|
ax.plot(times, velocities, 'b-', linewidth=1.5, label='速度 (m/s)')
|
ax.set_xlabel('时间 (秒)', fontsize=12)
|
ax.set_ylabel('速度 (m/s)', fontsize=12, color='b')
|
ax.tick_params(axis='y', labelcolor='b')
|
ax.grid(True, alpha=0.3)
|
|
# 双轴显示PWM
|
ax2 = ax.twinx()
|
ax2.plot(times, throttles, 'r--', linewidth=1, alpha=0.6, label='油门PWM')
|
ax2.set_ylabel('油门PWM', fontsize=12, color='r')
|
ax2.tick_params(axis='y', labelcolor='r')
|
|
# 标注状态
|
current_state = self.data[0].state
|
state_start_time = times[0]
|
for i, (t, d) in enumerate(zip(times, self.data)):
|
if d.state != current_state or i == len(self.data) - 1:
|
ax.axvspan(state_start_time, t, alpha=0.1, color='gray')
|
ax.text((state_start_time + t) / 2, max(velocities) * 0.9,
|
current_state, ha='center', fontsize=8, rotation=0)
|
current_state = d.state
|
state_start_time = t
|
|
plt.title('速度与油门PWM随时间变化', fontsize=14)
|
fig.tight_layout()
|
plt.savefig(output_dir / 'velocity_time.png', dpi=150)
|
print(f"保存: {output_dir / 'velocity_time.png'}")
|
plt.close()
|
|
# 2. 轨迹图
|
fig, ax = plt.subplots(figsize=(10, 10))
|
xs = [d.enu_x for d in self.data]
|
ys = [d.enu_y for d in self.data]
|
|
# 按状态着色
|
state_colors = {}
|
color_idx = 0
|
colors_list = plt.cm.tab10(np.linspace(0, 1, 10))
|
|
for state in self.states_data.keys():
|
state_colors[state] = colors_list[color_idx % 10]
|
color_idx += 1
|
|
for state, dlist in self.states_data.items():
|
xs_state = [d.enu_x for d in dlist]
|
ys_state = [d.enu_y for d in dlist]
|
ax.plot(xs_state, ys_state, color=state_colors[state],
|
linewidth=2, label=state, marker='o', markersize=2)
|
|
ax.set_xlabel('东向 (m)', fontsize=12)
|
ax.set_ylabel('北向 (m)', fontsize=12)
|
ax.set_title('车辆运动轨迹 (ENU坐标)', fontsize=14)
|
ax.grid(True, alpha=0.3)
|
ax.axis('equal')
|
ax.legend(fontsize=8, loc='best')
|
|
plt.tight_layout()
|
plt.savefig(output_dir / 'trajectory.png', dpi=150)
|
print(f"保存: {output_dir / 'trajectory.png'}")
|
plt.close()
|
|
# 3. PWM-速度关系图
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
|
|
# 前向速度
|
cruise_states = ['CRUISE_LOW', 'CRUISE_MID', 'CRUISE_HIGH']
|
for state in cruise_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
stable_data = dlist[len(dlist)//2:]
|
pwms = [d.throttle_pwm for d in stable_data]
|
vels = [d.velocity for d in stable_data]
|
ax1.scatter(pwms, vels, label=state, alpha=0.5, s=20)
|
|
ax1.set_xlabel('油门PWM', fontsize=12)
|
ax1.set_ylabel('速度 (m/s)', fontsize=12)
|
ax1.set_title('PWM-速度关系', fontsize=14)
|
ax1.grid(True, alpha=0.3)
|
ax1.legend()
|
|
# 转向角速度
|
turn_states = ['TURN_LEFT_LIGHT', 'TURN_RIGHT_LIGHT', 'TURN_LEFT_HEAVY', 'TURN_RIGHT_HEAVY']
|
for state in turn_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
start_idx = len(dlist) // 4
|
end_idx = 3 * len(dlist) // 4
|
stable_data = dlist[start_idx:end_idx]
|
steering_offsets = [d.steering_pwm - 1500 for d in stable_data]
|
yaw_rates = [d.yaw_rate for d in stable_data]
|
ax2.scatter(steering_offsets, yaw_rates, label=state, alpha=0.5, s=20)
|
|
ax2.axhline(0, color='k', linestyle='--', linewidth=0.5)
|
ax2.axvline(0, color='k', linestyle='--', linewidth=0.5)
|
ax2.set_xlabel('舵机PWM偏移 (相对1500)', fontsize=12)
|
ax2.set_ylabel('角速度 (rad/s)', fontsize=12)
|
ax2.set_title('舵机PWM-角速度关系', fontsize=14)
|
ax2.grid(True, alpha=0.3)
|
ax2.legend()
|
|
plt.tight_layout()
|
plt.savefig(output_dir / 'pwm_relationships.png', dpi=150)
|
print(f"保存: {output_dir / 'pwm_relationships.png'}")
|
plt.close()
|
|
# 4. 加速度分析
|
fig, ax = plt.subplots(figsize=(12, 6))
|
|
accel_states = ['ACCEL_LOW', 'ACCEL_MID', 'ACCEL_HIGH', 'BRAKE']
|
for state in accel_states:
|
if state in self.states_data:
|
dlist = self.states_data[state]
|
times_rel = [(d.time_ms - dlist[0].time_ms) / 1000.0 for d in dlist]
|
vels = [d.velocity for d in dlist]
|
ax.plot(times_rel, vels, marker='o', markersize=3, label=state, linewidth=2)
|
|
ax.set_xlabel('相对时间 (秒)', fontsize=12)
|
ax.set_ylabel('速度 (m/s)', fontsize=12)
|
ax.set_title('加速/制动特性', fontsize=14)
|
ax.grid(True, alpha=0.3)
|
ax.legend()
|
|
plt.tight_layout()
|
plt.savefig(output_dir / 'acceleration.png', dpi=150)
|
print(f"保存: {output_dir / 'acceleration.png'}")
|
plt.close()
|
|
def generate_json_report(self, output_file: Path):
|
"""生成JSON参数报告"""
|
print("\n=== 生成JSON参数报告 ===")
|
|
forward_model = self.analyze_forward_model()
|
steering_model = self.analyze_steering_model()
|
delays = self.analyze_delays()
|
|
report = {
|
'metadata': {
|
'log_file': str(self.log_file),
|
'total_samples': len(self.data),
|
'duration_seconds': (self.data[-1].time_ms - self.data[0].time_ms) / 1000.0,
|
'origin': {
|
'latitude': self.origin_lat,
|
'longitude': self.origin_lon,
|
'altitude': self.origin_alt
|
},
|
'states': {state: len(dlist) for state, dlist in self.states_data.items()}
|
},
|
'differential_drive_model': {
|
'forward_model': forward_model,
|
'steering_model': steering_model,
|
'delays': delays
|
},
|
'recommended_config': self._generate_recommended_config(forward_model, steering_model, delays)
|
}
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
print(f"JSON报告已保存: {output_file}")
|
|
return report
|
|
def _generate_recommended_config(self, forward_model: Dict, steering_model: Dict, delays: Dict) -> Dict:
|
"""生成推荐的配置参数"""
|
config = {}
|
|
# 前向模型参数
|
if 'pwm_to_velocity' in forward_model:
|
config['MC_CFG_FORWARD_K'] = forward_model['pwm_to_velocity']['k']
|
config['MC_CFG_FORWARD_BIAS'] = forward_model['pwm_to_velocity']['bias']
|
|
# 转向模型参数
|
if 'pwm_to_yaw_rate' in steering_model:
|
config['MC_CFG_STEERING_K'] = steering_model['pwm_to_yaw_rate']['k']
|
|
# 加速度限制
|
if 'acceleration' in forward_model:
|
max_accel = max(forward_model['acceleration'].values()) if forward_model['acceleration'] else 1.0
|
config['MC_CFG_MAX_ACCELERATION'] = max_accel
|
|
if 'deceleration' in forward_model:
|
config['MC_CFG_MAX_DECELERATION'] = abs(forward_model['deceleration'])
|
|
# 响应延迟
|
if 'throttle_response_delay_ms' in delays:
|
config['MC_CFG_RESPONSE_DELAY_MS'] = delays['throttle_response_delay_ms']
|
|
return config
|
|
|
def main():
|
parser = argparse.ArgumentParser(description='车辆运动模型校准数据分析工具')
|
parser.add_argument('log_file', type=str, help='校准日志文件路径 (.log或.txt)')
|
parser.add_argument('-o', '--output', type=str, default='calibration_results',
|
help='输出目录 (默认: calibration_results)')
|
|
args = parser.parse_args()
|
|
log_file = Path(args.log_file)
|
if not log_file.exists():
|
print(f"错误: 日志文件不存在: {log_file}")
|
sys.exit(1)
|
|
output_dir = Path(args.output)
|
|
print("=" * 60)
|
print("车辆运动模型校准数据分析工具")
|
print("=" * 60)
|
|
# 创建分析器
|
analyzer = CalibrationAnalyzer(str(log_file))
|
|
# 解析日志
|
analyzer.parse_log()
|
|
# 计算ENU坐标和速度
|
analyzer.compute_enu_and_velocity()
|
|
# 生成可视化图表
|
analyzer.generate_plots(output_dir)
|
|
# 生成JSON报告
|
json_file = output_dir / 'calibration_parameters.json'
|
report = analyzer.generate_json_report(json_file)
|
|
# 打印推荐配置
|
print("\n" + "=" * 60)
|
print("推荐的配置参数 (可复制到motion_config.h):")
|
print("=" * 60)
|
for key, value in report['recommended_config'].items():
|
if isinstance(value, float):
|
print(f"#define {key:<35} ({value:.8f}f)")
|
else:
|
print(f"#define {key:<35} ({value})")
|
|
print("\n" + "=" * 60)
|
print("分析完成!")
|
print(f"输出目录: {output_dir.absolute()}")
|
print("=" * 60)
|
|
|
if __name__ == '__main__':
|
main()
|