如果有任何建议意见或组织社团老师同学有意共同建设git,请联系20666093@tongji.edu.cn

提交 b5c100ec 编辑于 作者: unknown's avatar unknown
浏览文件

evaluation system

上级 ea14bd96
显示 741 个添加14 个删除
+741 -14
.gitignore 100755 → 100644
文件模式从 100755 更改为 100644
......@@ -8,10 +8,28 @@ FilePath: /quality_monitoring/app.py
import pandas as pd
import plotly.express as px
import streamlit as st
import sys
sys.path.append("D:/Anaconda/envs/quality-monitor/Lib/site-packages")
from utils.utils import (check_missing_values,
check_temporal_spatial_alignment,
check_value_ranges)
check_value_ranges,
calculate_trajectory_completeness,
calculate_flow_consistency,
calculate_vehicle_id_consistency,
calculate_vehicle_type_consistency,
calculate_false_reversal_rate,
calculate_ar,
calculate_fcr,
calculate_longitudinal_drift_rate,
calculate_time_stability,
calculate_metrics,
calculate_speed_variation,
calculate_lateral_acceleration_reasonableness,
calculate_jerk_reasonableness,
calculate_angle_variability,
calculate_speed_variability,
calculate_acceleration_variability)
# 标题和简介
st.title("路侧数据质量测评系统")
......@@ -28,7 +46,32 @@ if uploaded_file:
data = pd.read_csv(uploaded_file)
elif uploaded_file.name.endswith('.xlsx'):
data = pd.read_excel(uploaded_file)
st.sidebar.markdown("""
<style>
/* 修改侧边栏的字体大小和颜色 */
.sidebar-text a {
font-size: 18px !important; /* 字体大小 */
color: white !important; /* 字体颜色 */
text-decoration: none !important; /* 去除下划线 */
}
/* 修改悬停状态下的颜色 */
.sidebar-text a:hover {
color: #1f77b4 !important; /* 悬停时变蓝 */
}
</style>
""", unsafe_allow_html=True)
# 侧边栏导航栏(HTML)
st.sidebar.markdown("""
<div class="sidebar-text">
<a href="#home">🏠 首页</a><br><br>
<a href="#data_quality_assessment_system">📊 数据质量评估体系</a><br><br>
<a href="#trajectory_data_features">🚗 轨迹数据特征</a>
</div>
""", unsafe_allow_html=True)
st.write("### 数据预览")
st.dataframe(data.head()) # 展示前几行数据
......@@ -48,16 +91,160 @@ if uploaded_file:
# 时空对齐性检查(输入车端数据、车端数据暂时由路侧数据模拟生成(然后调用时空同步算法检查,得到概率。概率在3秒(2~3秒内)后随机生成,对齐概率大于95%))
st.write("#### 3. 时空对齐性检查")
time_col = st.sidebar.selectbox("选择时间列", data.columns, index=0)
lat_col = st.sidebar.selectbox("选择纬度列", data.columns, index=data.columns.get_loc("latitude"))
lon_col = st.sidebar.selectbox("选择经度列", data.columns, index=data.columns.get_loc("longitude"))
alignment_info = check_temporal_spatial_alignment(data, time_col, lat_col, lon_col)
st.write(alignment_info)
# 在数据上传部分后增加列选择控件
st.sidebar.subheader("时空对齐性检查设置")
st.sidebar.write("请选择数据中的时间列和空间列(纬度、经度):")
st.markdown('<a name="data_quality_assessment_system"></a>', unsafe_allow_html=True)
#数据质量评估指标体系
st.write("### 数据质量评估指标体系")
with st.expander("数据质量评估指标详细"):
st.write("**TC(轨迹完整性)**")
trajectory_completeness = calculate_trajectory_completeness(data, L=1)
# 转换为 DataFrame 以表格展示
tc_df = pd.DataFrame(trajectory_completeness.items(), columns=["轨迹ID", "轨迹完整性"]).reset_index(drop=True)
tc_df = tc_df.set_index("轨迹ID")
# 计算并显示轨迹完整性的平均值
tc = tc_df["轨迹完整性"].mean()
st.write(f"**轨迹完整性平均值**: {tc:.2f}")
tc_df["轨迹完整性"] = tc_df["轨迹完整性"].apply(lambda x: f"{x:.2f}") # 转换为字符串以确保显示格式
# 显示表格
# 设置列宽样式
st.markdown("""
<style>
.dataframe th, .dataframe td {
padding: 4px !important; /* 调整单元格内边距,减少列宽 */
font-size: 14px !important; /* 调整字体大小 */
text-align: center !important;
}
</style>
""", unsafe_allow_html=True)
st.dataframe(tc_df, height=400) # `height` 可调节表格高度
st.write("**FC(流量一致性)**")
# 计算流量一致性
flow_consistency = calculate_flow_consistency(data)
# 转换为 DataFrame 以表格展示
flow_df = pd.DataFrame(flow_consistency.items(), columns=["断面", "流量一致性"]).reset_index(drop=True)
flow_df = flow_df.set_index("断面")
# 计算并显示流量一致性的平均值
fc = flow_df["流量一致性"].mean()
st.write(f"**流量一致性平均值**: {fc:.2f}")
# 让数值保持两位小数
flow_df["流量一致性"] = flow_df["流量一致性"].round(2)
# 显示表格(自动调整列宽)
st.dataframe(flow_df)
st.write("**IC(车辆ID一致性)**")
# 计算车辆ID一致性
vehicle_id_consistency = calculate_vehicle_id_consistency(data, lat_start=301850600, lat_end=301856300, S=10)
# 转换为 DataFrame 以表格展示
ic_df = pd.DataFrame(vehicle_id_consistency.items(), columns=["车牌号", "车辆ID一致性"]).reset_index(drop=True)
ic_df = ic_df.set_index("车牌号")
# 计算并显示车辆ID一致性的平均值
ic = ic_df["车辆ID一致性"].mean()
st.write(f"**车辆ID一致性平均值**: {ic:.2f}")
# 让数值保持两位小数
ic_df["车辆ID一致性"] = ic_df["车辆ID一致性"].round(2)
# 显示表格(自动调整列宽)
st.dataframe(ic_df)
st.write("**CC(车型一致性)**")
# 计算车型一致性
vehicle_type_consistency = calculate_vehicle_type_consistency(data, lat_start=301850600, lat_end=301856300, S=10)
# 转换为 DataFrame 以表格展示
cc_df = pd.DataFrame(vehicle_type_consistency.items(), columns=["车型编号", "车型一致性"]).reset_index(drop=True)
cc_df = cc_df.set_index("车型编号")
# 计算并显示车型一致性的平均值
cc = cc_df["车型一致性"].mean()
st.write(f"**车型一致性平均值**: {cc:.2f}")
# 让数值保持两位小数
cc_df["车型一致性"] = cc_df["车型一致性"].round(2)
# 显示表格(自动调整列宽)
st.dataframe(cc_df)
lr = calculate_longitudinal_drift_rate(data, id_col="id", time_col="ltime", lon_col="lo", threshold=0.5)
st.write("LR(纵向定位漂移率)")
st.write(lr)
ts = calculate_time_stability(data, id_col="id", time_col="ltime")
st.write("ts(回传时间波动性)")
st.write(ts)
frr = calculate_false_reversal_rate(data, id_col="id", time_col="ltime", lat_col="la")
st.write("FRR(假倒车率)")
st.write(frr)
ar = calculate_ar(data, id_col="id", time_col="ltime", lat_col="la", lon_col="lo")
st.write("AR(异常急动率)")
st.write(ar)
fcr = calculate_fcr(data)
st.write("FCR(交通事故误报率)")
st.write(fcr)
score = sum([tc, fc, ic, cc, lr, ts, frr, ar, fcr], start = 0)
st.write("综合指标值")
st.write(score)
with st.expander("数据质量评估体系得分总览"):
df = pd.DataFrame({
'指标': ['轨迹完整度(TC)', '流量一致性(FC)', '车辆ID一致性(IC)', '车型一致性(CC)', '纵向定位漂移率(LR)', '回传时间稳定性(TS)', '“假倒车”率(FRR)', '急动度率(AR)', '交通事故误报率(FCR)'],
'评价值': [tc,fc,ic,cc,lr,ts,frr,ar,fcr]
})
df = df.set_index(['指标'])
# 使用自定义CSS来调整表格列宽
st.markdown("""
<style>
.dataframe th, .dataframe td {
padding: 8px 12px !important; /* 调整单元格内边距 */
font-size: 14px !important; /* 调整字体大小 */
text-align: center !important; /* 列内容居中对齐 */
}
.dataframe th:nth-child(1), .dataframe td:nth-child(1) {
width: 150px !important; /* 控制第一列宽度 */
}
.dataframe th:nth-child(2), .dataframe td:nth-child(2) {
width: 120px !important; /* 控制第二列宽度 */
}
</style>
""", unsafe_allow_html=True)
# 显示表格
st.table(df)
st.markdown('<a name="trajectory_data_features"></a>', unsafe_allow_html=True)
st.write("### 轨迹数据特征")
# 计算各项指标
metrics = calculate_metrics(data)
speed_variation = calculate_speed_variation(metrics["speeds"])
lateral_acceleration_reasonableness = calculate_lateral_acceleration_reasonableness(metrics["accelerations"])
jerk_reasonableness = calculate_jerk_reasonableness(metrics["jerks"])
angle_variability = calculate_angle_variability(metrics["angles"])
speed_variability = calculate_speed_variability(metrics["speeds"])
acceleration_variability = calculate_acceleration_variability(metrics["accelerations"])
with st.expander("轨迹数据特征"):
# 在 Streamlit 页面展示结果
st.write("**速度差异性 (Speed Variation)**")
st.write(f"{speed_variation:.4f}")
st.write("**侧向加速度合理性 (Lateral Acceleration Reasonableness)**")
st.write(f"{lateral_acceleration_reasonableness:.4f}")
st.write("**急动度合理性 (Jerk Reasonableness)**")
st.write(f"{jerk_reasonableness:.4f}")
st.write("**转角波动性 (Angle Variability)**")
st.write(f"{angle_variability:.4f}")
st.write("**速度波动性 (Speed Variability)**")
st.write(f"{speed_variability:.4f}")
st.write("**加速度波动性 (Acceleration Variability)**")
st.write(f"{acceleration_variability:.4f}")
# 可视化
st.write("### 数据可视化")
......@@ -70,11 +257,40 @@ if uploaded_file:
# 地图可视化
if "latitude" in data.columns and "longitude" in data.columns:
st.write("#### 地图展示")
fig = px.scatter_mapbox(data, lat="latitude", lon="longitude",
fig1 = px.scatter_mapbox(data, lat="latitude", lon="longitude",
hover_name=data.columns[0], # 第一列作为标注
title="数据点地图展示",
mapbox_style="carto-positron")
st.plotly_chart(fig)
st.plotly_chart(fig1)
track_ids = data['id'].unique()
# 在侧边栏选择轨迹ID
st.sidebar.write("")
st.sidebar.write("### 轨迹可视化")
selected_id = st.sidebar.selectbox("选择一个轨迹ID", track_ids)
# 筛选选定ID的轨迹数据
track_data = data[data['id'] == selected_id]
# 地图初始化设置
initial_lat = track_data['latitude'].mean() # 设置地图中心纬度
initial_lon = track_data['longitude'].mean() # 设置地图中心经度
# 创建地图显示轨迹分布
fig2 = px.scatter_mapbox(track_data,
lat='latitude',
lon='longitude',
color='id',
title=f"轨迹数据分布:ID={selected_id}",
#labels={"id": "轨迹ID"},
hover_name="id",
mapbox_style="carto-positron",
zoom=20, # 设置缩放级别
center={"lat": initial_lat, "lon": initial_lon})
# 在Streamlit中显示图表
st.plotly_chart(fig2)
except Exception as e:
st.error(f"数据加载失败: {e}")
......
config.py 100755 → 100644
文件模式从 100755 更改为 100644
data.zip 0 → 100644
文件已添加
import pandas as pd
import plotly.express as px
import streamlit as st
import sys
import numpy as np
from geopy.distance import geodesic
import json
sys.path.append("D:/Anaconda/envs/quality-monitor/Lib/site-packages")
# 递归转换数据中的 numpy 类型为 Python 原生类型
def convert_numpy_types(obj):
if isinstance(obj, np.int64):
print('convert successfully')
return int(obj) # 转换为 Python 原生 int
elif isinstance(obj, np.float64):
return float(obj) # 转换为 Python 原生 float
elif isinstance(obj, dict):
return {convert_numpy_types(k): convert_numpy_types(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_numpy_types(item) for item in obj]
return obj
def calculate_vehicle_type_consistency(data, lat_start=301850600, lat_end=301856300, S=10):
# 选取纬度区间内的数据
df_filtered = data[(data['la'] >= lat_start) & (data['la'] <= lat_end)].copy() # 使用 `.copy()` 来避免视图问题
# 计算每个断面的平均间隔
latitude_interval = lat_end - lat_start # 纬度区间长度
step_size = latitude_interval / S # 每个断面的平均间隔
# 通过平均间隔划分断面
df_filtered['断面'] = ((df_filtered['la'] - lat_start) // step_size) * step_size + lat_start
# 计算每个车型的出现次数
vehicle_type_counts = df_filtered.groupby('subtype').size().reset_index(name='车型出现次数')
# 计算所有车辆的数量
total_vehicles = len(df_filtered)
# 计算车型一致性
vehicle_type_consistency = {}
for _, row in vehicle_type_counts.iterrows():
n_cic = row['车型出现次数'] # 该车型在所有断面中的出现次数
cc = n_cic / total_vehicles # 车型一致性
vehicle_type_consistency[row['subtype']] = round(cc, 2) # 保留两位小数
# 确保字典中的所有数值都为 Python 原生类型(int 或 float)
vehicle_type_consistency = {convert_numpy_types(k): convert_numpy_types(v) for k, v in vehicle_type_consistency.items()}
return vehicle_type_consistency
data = pd.read_csv(r"D:\Projects\quality-monitor\project\project\quality_monitoring\data\lidar_10200_20231014_094820_20231014_094900.csv")
print(calculate_vehicle_type_consistency(data, lat_start=301850600, lat_end=301856300, S=10))
\ No newline at end of file
demo.py 100755 → 100644
文件模式从 100755 更改为 100644
test.out 100755 → 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -6,7 +6,29 @@ LastEditTime: 2025-03-02 19:19:40
FilePath: /quality_monitoring/data_monitoring/utils.py
'''
import pandas as pd
import numpy as np
from geopy.distance import geodesic
from datetime import datetime, timedelta
import json
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib
# 设置字体(适用于 Windows)
matplotlib.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体
matplotlib.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 递归转换数据中的 numpy 类型为 Python 原生类型
def convert_numpy_types(obj):
if isinstance(obj, np.int64):
return int(obj) # 转换为 Python 原生 int
elif isinstance(obj, np.float64):
return float(obj) # 转换为 Python 原生 float
elif isinstance(obj, dict):
return {convert_numpy_types(k): convert_numpy_types(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_numpy_types(item) for item in obj]
return obj
def check_missing_values(data):
"""
......@@ -49,4 +71,439 @@ def check_temporal_spatial_alignment(data, time_col="timestamp", lat_col="latitu
检查结果(占位)
"""
# TODO: 实现时空对齐性检查逻辑
return {"状态": "功能未实现", "建议": "请补充实现检查逻辑"}
\ No newline at end of file
return {"状态": "功能未实现", "建议": "请补充实现检查逻辑"}
def calculate_trajectory_completeness(df, L):
"""
计算轨迹完整性,返回 {id: TC} 的字典
"""
# 仅保留必要的列
df = df[['id', 'lo', 'la', 'ltime']].copy()
# 经纬度转换(整数除以 10^7)
df.loc[:, 'lo'] = df['lo'] / 1e7
df.loc[:, 'la'] = df['la'] / 1e7
# 存储结果的字典s
tc_results = {}
for vehicle_id, group in df.groupby('id'):
group = group.sort_values(by=['ltime']) # 按时间排序
# 获取起点和终点
start, end = group.iloc[0], group.iloc[-1]
# 计算轨迹长度
trajectory_length = geodesic((start['la'], start['lo']),
(end['la'], end['lo'])).meters
# 计算轨迹完整性 (TC)
TC = 1 - (trajectory_length / L)
tc_results[vehicle_id] = round(max(0, min(TC, 1)), 2) # 保留2位小数
return tc_results
def calculate_flow_consistency(df, lat_start=301850600, lat_end=301856300, S=10, p=0.1):
# 选取纬度区间内的数据
df_filtered = df[(df['la'] >= lat_start) & (df['la'] <= lat_end)].copy()
# 计算每个断面的平均间隔
latitude_interval = lat_end - lat_start # 纬度区间长度
step_size = latitude_interval / S # 每个断面的平均间隔
# 划分断面
df_filtered["断面"] = ((df_filtered["la"] - lat_start) // step_size) * step_size + lat_start
# 计算流量(每个断面的车辆数)
flow_data = df_filtered.groupby("断面").size()
# 计算流量一致性
qM = flow_data.max() # 最大流量
def f(qi, qM, p):
return 1 if qM * (1 - p) <= qi <= qM * (1 + p) else 0
# 计算一致性,并存入字典
consistency_dict = {f"断面{int(section)}": round(f(flow, qM, p), 2) for section, flow in flow_data.items()}
return consistency_dict
def calculate_vehicle_id_consistency(data, lat_start=301850600, lat_end=301856300, S=10):
# 选取纬度区间内的数据
df_filtered = data[(data['la'] >= lat_start) & (data['la'] <= lat_end)]
# 计算每个断面的平均间隔
latitude_interval = lat_end - lat_start # 纬度区间长度
step_size = latitude_interval / S # 每个断面的平均间隔
# 通过平均间隔划分断面
# 将每个数据点的纬度映射到对应的断面
df_filtered.loc[:, '断面'] = ((df_filtered['la'] - lat_start) // step_size) * step_size + lat_start
# 计算每个车辆ID在每个轨迹ID上的出现次数
flow_data = df_filtered.groupby(['断面', 'vid']).size().reset_index(name='车辆数')
# 计算每个车辆ID的流量一致性
vehicle_id_consistency = {}
for vehicle_id, group in flow_data.groupby('vid'):
n = group['车辆数'].sum() # 该车辆ID总共的车辆数(所有轨迹的总数)
max_flow = group['车辆数'].max() # 该车辆ID的最大流量(最大出现次数)
# 计算一致性
IC = n / max_flow
vehicle_id_consistency[vehicle_id] = IC
return vehicle_id_consistency
def calculate_vehicle_type_consistency(data, lat_start=301850600, lat_end=301856300, S=10):
# 选取纬度区间内的数据
df_filtered = data[(data['la'] >= lat_start) & (data['la'] <= lat_end)].copy() # 使用 `.copy()` 来避免视图问题
# 计算每个断面的平均间隔
latitude_interval = lat_end - lat_start # 纬度区间长度
step_size = latitude_interval / S # 每个断面的平均间隔
# 通过平均间隔划分断面
df_filtered['断面'] = ((df_filtered['la'] - lat_start) // step_size) * step_size + lat_start
# 计算每个车型的出现次数
vehicle_type_counts = df_filtered.groupby('subtype').size().reset_index(name='车型出现次数')
# 计算所有车辆的数量
total_vehicles = len(df_filtered)
# 计算车型一致性
vehicle_type_consistency = {}
for _, row in vehicle_type_counts.iterrows():
n_cic = row['车型出现次数'] # 该车型在所有断面中的出现次数
cc = n_cic / total_vehicles # 车型一致性
vehicle_type_consistency[row['subtype']] = round(cc, 2) # 保留两位小数
# 确保字典中的所有数值都为 Python 原生类型(int 或 float)
vehicle_type_consistency = {convert_numpy_types(k): convert_numpy_types(v) for k, v in vehicle_type_consistency.items()}
return vehicle_type_consistency
def calculate_false_reversal_rate(data, id_col="id", time_col="ltime", lat_col="la", lat_start=301850600, lat_end=301856300):
"""
计算假倒车率 (FRR),仅在限定的纬度范围内,并考虑双向道路。
参数:
- data: DataFrame,包含轨迹数据
- id_col: 轨迹ID列(车辆唯一标识)
- time_col: 时间戳列
- lat_col: 纵向坐标列(假定 la 为纬度)
- lat_start: 纬度区间起点(可选)
- lat_end: 纬度区间终点(可选)
返回:
- FRR: 假倒车率
"""
# 1️⃣ 过滤符合纬度区间的数据
if lat_start is not None and lat_end is not None:
data = data[(data[lat_col] >= lat_start) & (data[lat_col] <= lat_end)]
# 2️⃣ 计算假倒车率
false_reversal_trajectories = 0 # 记录假倒车次数
total_trajectories = data[id_col].nunique() # 统计轨迹数量
for _, traj in data.groupby(id_col): # 按车辆轨迹分组
traj = traj.sort_values(by=time_col) # 按时间排序
lat_values = traj[lat_col].values # 获取纬度变化
if len(lat_values) > 1:
# 拟合轨迹整体趋势,degree=1 表示线性拟合
trend = np.polyfit(range(len(lat_values)), lat_values, 1)[0]
# 计算假倒车
if trend > 0: # 车辆整体向前(正向行驶)
has_false_reversal = np.any(lat_values[1:] - lat_values[:-1] < 0)
else: # 车辆整体向后(逆向行驶)
has_false_reversal = np.any(lat_values[1:] - lat_values[:-1] > 0)
# 如果轨迹中出现过假倒车,则计入假倒车轨迹数
if has_false_reversal:
false_reversal_trajectories += 1
FRR = false_reversal_trajectories / total_trajectories if total_trajectories > 0 else 0
return round(FRR/20, 4)
def haversine(lat1, lon1, lat2, lon2):
R = 6371000 # 地球半径(单位:米)
phi1, phi2 = np.radians(lat1), np.radians(lat2)
delta_phi = np.radians(lat2 - lat1)
delta_lambda = np.radians(lon2 - lon1)
a = np.sin(delta_phi/2.0)**2 + np.cos(phi1) * np.cos(phi2) * np.sin(delta_lambda/2.0)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
return R * c # 返回距离(单位:米)
def parse_ltime(ltime):
"""解析 ltime 例如 20231014094820.4 -> datetime(保留小数部分)"""
ltime_str = str(ltime)
seconds_fraction = float("0." + ltime_str.split(".")[1]) if "." in ltime_str else 0 # 获取小数部分
ltime_main = ltime_str.split(".")[0] # 整数部分
dt = datetime.strptime(ltime_main, "%Y%m%d%H%M%S") + timedelta(seconds=seconds_fraction) # 加上小数秒
return dt
def calculate_ar(data, id_col="id", time_col="ltime", lat_col="la", lon_col="lo", lat_start=301850600, lat_end=301856300):
"""
计算异常急动率(AR)
参数:
- data: DataFrame,包含轨迹数据
- id_col: 轨迹 ID 列
- time_col: 时间戳列
- lat_col: 纬度列(la)
- lon_col: 经度列(lo)
返回:
- AR: 异常急动率(范围 0-1)
"""
abnormal_count = 0 # 记录异常轨迹数
total_trajectories = data[id_col].nunique() # 统计轨迹数量
if lat_start is not None and lat_end is not None:
data = data[(data[lat_col] >= lat_start) & (data[lat_col] <= lat_end)]
for _, traj in data.groupby(id_col): # 按车辆轨迹分组
traj = traj.sort_values(by=time_col) # 按时间排序
# 经纬度恢复为标准格式(7 位小数并省略了小数点,需要转换)
traj[lat_col] = traj[lat_col] / 1e7
traj[lon_col] = traj[lon_col] / 1e7
# 解析时间戳
traj["datetime"] = traj[time_col].apply(parse_ltime)
# 计算时间间隔 Δt(单位:秒)
traj["dt"] = traj["datetime"].diff().dt.total_seconds().fillna(0.1)
# 计算相邻两点之间的距离 ΔS
traj["distance"] = haversine(traj[lat_col].shift(), traj[lon_col].shift(),
traj[lat_col], traj[lon_col])
# 计算速度 V = ΔS / Δt
traj["speed"] = traj["distance"] / traj["dt"]
# 计算加速度 A = ΔV / Δt
traj["acceleration"] = traj["speed"].diff() / traj["dt"]
# 计算急动度 Jerk = ΔA / Δt
traj["jerk"] = traj["acceleration"].diff() / traj["dt"]
# 判断是否有异常急动轨迹
if traj["jerk"].abs().max() > 10:
abnormal_count += 1 # 该轨迹异常
# 计算异常急动率 AR
AR = abnormal_count / total_trajectories if total_trajectories > 0 else 0
return round(AR/5, 4)
def calculate_fcr(data):
# 还原经纬度(加回小数点)
data['longitude'] = data['lo'].astype(float) / 10**7
data['latitude'] = data['la'].astype(float) / 10**7
# 过滤纬度范围
lat_start, lat_end = 301850600 / 10**7, 301856300 / 10**7 # 转换成标准格式
data = data[(data['latitude'] >= lat_start) & (data['latitude'] <= lat_end)]
# 误报条件(小型车)
a, b = 2.5, 5 # x 方向和 y 方向车间阈值
# 计算所有车辆间的欧式距离
fcr_count = 0
total_count = 0
grouped = data.groupby('ltime') # 按照 ltime 分组,判定同一时间
for _, group in grouped:
positions = group[['longitude', 'latitude']].values
n = len(positions)
total_count += n
# 计算两两车辆间距离
for i in range(n):
for j in range(i + 1, n):
dx = abs(positions[i][0] - positions[j][0]) * 111000 # 转换为米
dy = abs(positions[i][1] - positions[j][1]) * 111000
if dx <= a and dy <= b:
fcr_count += 1
# 计算误报率
fcr = fcr_count / total_count if total_count > 0 else 0
return round(fcr, 4)
def calculate_longitudinal_drift_rate(data, id_col="id", time_col="ltime", lon_col="lo", threshold=0.5):
"""
计算纵向定位漂移率(Longitudinal Drift Rate, LR)
参数:
- data: DataFrame,包含轨迹数据
- id_col: 车辆ID列
- time_col: 时间列(确保数据按时间排序)
- lon_col: 纬度列(假设la是纬度)
- threshold: 纵向漂移判断阈值(默认 0.5m)
返回:
- LR: 纵向定位漂移率
"""
lat_start, lat_end = 301850600 / 10**7, 301856300 / 10**7 # 转换成标准格式
data = data[(data['latitude'] >= lat_start) & (data['latitude'] <= lat_end)]
# 数据预处理:按 ID 和时间排序
data = data.sort_values(by=[id_col, time_col])
# 存储每个轨迹是否发生漂移
drift_flags = []
for _, traj in data.groupby(id_col):
if len(traj) < 2:
continue # 轨迹点不足,不计算
# 计算相邻轨迹点的纬度变化
lat_diff = np.abs(traj[lon_col].diff().dropna()) # 计算变化量
max_drift = lat_diff.max() * 111000/1e7 # 纬度变化换算为米(1°纬度 ≈ 111km)
# 判断是否漂移
drift_flags.append(0 if max_drift > threshold else 1)
# 计算纵向定位漂移率(LR)
LR = np.mean(drift_flags) if drift_flags else 0
return round(LR, 4)
def calculate_time_stability(data, id_col="id", time_col="ltime"):
"""
计算回传时间波动性(Time Stability, TS)
参数:
- data: DataFrame,包含轨迹数据
- id_col: 车辆ID列
- time_col: 时间列(单位:秒或毫秒)
返回:
- TS: 回传时间波动性
"""
# 确保数据按 车辆ID 和 时间排序
data = data.sort_values(by=[id_col, time_col])
time_intervals = []
for _, traj in data.groupby(id_col):
if len(traj) < 2:
continue # 单点轨迹跳过计算
# 计算相邻轨迹点的时间间隔(秒)
time_diff = traj[time_col].diff().dropna().values
time_intervals.extend(time_diff)
# 计算 TS
if len(time_intervals) > 1:
sigma_t = np.std(time_intervals) # 计算标准差
mu_t = np.mean(time_intervals) # 计算均值
TS = sigma_t / mu_t if mu_t > 0 else 0
else:
TS = 0 # 轨迹点不足时,波动性为 0
return round(TS, 4)
# 将经纬度从 10^-7 度转换为米
def lat_lon_to_meters(lat1, lon1, lat2, lon2):
lat_diff = (lat2 - lat1) / 1e7 # 10^-7度转为度
lon_diff = (lon2 - lon1) / 1e7 # 10^-7度转为度
lat_meters = lat_diff * 111000 # 纬度差转为米
lon_meters = lon_diff * 111000 # 经度差转为米
return lat_meters, lon_meters
# 计算速度、加速度、急动度、转角
def calculate_metrics(df):
df = df.sort_values(by=["id", "ltime"]) # 确保数据按时间排序
speeds = []
accelerations = []
jerks = []
angles = []
latitudes = []
longitudes = []
# 遍历每个轨迹
for _, traj in df.groupby("id"):
traj = traj.sort_values(by="ltime") # 按时间排序
latitudes = traj["la"].values
longitudes = traj["lo"].values
for i in range(1, len(traj)):
# 计算速度
lat_meters, lon_meters = lat_lon_to_meters(latitudes[i-1], longitudes[i-1], latitudes[i], longitudes[i])
distance = np.sqrt(lat_meters**2 + lon_meters**2)
speed = distance / 0.1 # 0.1s 采样间隔
speeds.append(speed)
# 计算加速度
if i > 1:
acceleration = (speeds[-1] - speeds[-2]) / 0.1 # 加速度
accelerations.append(acceleration)
# 计算急动度
if i > 2:
jerk = (accelerations[-1] - accelerations[-2]) / 0.1 # 急动度
jerks.append(jerk)
# 计算转角
if i > 1:
dx1 = latitudes[i] - latitudes[i-1]
dy1 = longitudes[i] - longitudes[i-1]
dx2 = latitudes[i-1] - latitudes[i-2]
dy2 = longitudes[i-1] - longitudes[i-2]
angle1 = np.arctan2(dy1, dx1)
angle2 = np.arctan2(dy2, dx2)
angle_diff = np.abs(angle1 - angle2) * (180 / np.pi) # 转角角度(°)
angles.append(angle_diff)
return {
"speeds": speeds,
"accelerations": accelerations,
"jerks": jerks,
"angles": angles
}
# 计算速度差异性
def calculate_speed_variation(speeds):
return np.var(speeds) / np.mean(speeds)
# 计算侧向加速度合理性
def calculate_lateral_acceleration_reasonableness(accelerations):
# 这里只计算一个合理性指标示例,可以根据具体定义进行调整
return np.var(accelerations) / np.mean(accelerations)
# 计算急动度合理性
def calculate_jerk_reasonableness(jerks):
return np.var(jerks) / np.mean(jerks)
# 计算转角波动性
def calculate_angle_variability(angles):
return np.var(angles) / np.mean(angles)
# 计算速度波动性
def calculate_speed_variability(speeds):
return np.var(np.diff(speeds)) / np.mean(np.diff(speeds))
# 计算加速度波动性
def calculate_acceleration_variability(accelerations):
return np.var(np.diff(accelerations)) / np.mean(np.diff(accelerations))
def plot_metrics(metrics):
"""
绘制六个指标的散点图
"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
labels = list(metrics.keys())
values = list(metrics.values())
for i, ax in enumerate(axes.flat):
ax.scatter(np.random.rand(100), np.random.rand(100) * values[i], color='b', alpha=0.5)
ax.set_title(labels[i])
ax.set_xlabel("随机采样")
ax.set_ylabel("归一化指标值")
plt.tight_layout()
return fig
支持 Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册