第五章:追踪分析¶
本章介绍如何分析分布式追踪数据,定位性能瓶颈和故障。
追踪数据分析¶
服务依赖分析¶
通过追踪数据构建服务依赖图:
┌─────────────────────────────────────────────────────────────┐
│ 服务依赖分析 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 服务调用关系: │
│ │
│ API Gateway │
│ │ │
│ ├──► User Service (1000 calls/min, 45ms avg) │
│ │ │ │
│ │ ├──► MySQL (500 calls/min, 10ms avg) │
│ │ └──► Redis (800 calls/min, 2ms avg) │
│ │ │
│ └──► Order Service (500 calls/min, 80ms avg) │
│ │ │
│ ├──► Payment Service (400 calls/min) │
│ │ │ │
│ │ └──► Stripe API (380 calls/min) │
│ │ │
│ └──► Inventory Service (450 calls/min) │
│ │
│ 关键发现: │
│ - Order Service 响应时间最长 (80ms) │
│ - Payment Service 调用失败率最高 (5%) │
│ - Redis 缓存命中率 80% │
│ │
└─────────────────────────────────────────────────────────────┘
性能瓶颈识别¶
// 查询慢 Span
GET jaeger-span-*/_search
{
"query": {
"bool": {
"filter": [
{ "range": { "duration": { "gte": 100000000 } } } // > 100ms
]
}
},
"aggs": {
"by_operation": {
"terms": {
"field": "operationName.keyword",
"size": 20
},
"aggs": {
"avg_duration": {
"avg": { "field": "duration" }
}
}
}
}
}
错误分析¶
// 查询错误 Span
GET jaeger-span-*/_search
{
"query": {
"bool": {
"filter": [
{ "term": { "tags.error": "true" } }
]
}
},
"aggs": {
"by_service": {
"terms": {
"field": "process.serviceName.keyword"
},
"aggs": {
"by_error": {
"terms": {
"field": "tags.error_type.keyword"
}
}
}
}
}
}
延迟分析¶
延迟分布¶
import pandas as pd
import numpy as np
def analyze_latency(spans):
df = pd.DataFrame(spans)
# 计算延迟分布
latency_stats = df.groupby('operation_name')['duration'].agg([
('count', 'count'),
('mean', 'mean'),
('median', 'median'),
('p95', lambda x: np.percentile(x, 95)),
('p99', lambda x: np.percentile(x, 99)),
('max', 'max')
])
return latency_stats
关键路径分析¶
┌─────────────────────────────────────────────────────────────┐
│ 关键路径分析 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 请求总耗时: 150ms │
│ │
│ 关键路径(串行耗时): │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ API Gateway (10ms) │ │
│ │ ▼ │ │
│ │ User Service (30ms) │ │
│ │ ▼ │ │
│ │ Order Service (60ms) ← 瓶颈 │ │
│ │ ▼ │ │
│ │ Payment Service (35ms) │ │
│ │ ▼ │ │
│ │ Response (5ms) │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ 优化建议: │
│ 1. Order Service 并行处理子任务 │
│ 2. Payment Service 使用异步调用 │
│ 3. 添加缓存减少数据库查询 │
│ │
└─────────────────────────────────────────────────────────────┘
异常检测¶
延迟异常¶
def detect_latency_anomaly(spans, threshold=2.0):
"""检测延迟异常"""
df = pd.DataFrame(spans)
# 计算移动平均和标准差
df['rolling_mean'] = df['duration'].rolling(window=100).mean()
df['rolling_std'] = df['duration'].rolling(window=100).std()
# 检测异常
df['is_anomaly'] = (
df['duration'] > df['rolling_mean'] + threshold * df['rolling_std']
)
return df[df['is_anomaly']]
错误率异常¶
def detect_error_rate_anomaly(spans, window=60, threshold=0.05):
"""检测错误率异常"""
df = pd.DataFrame(spans)
# 按时间窗口计算错误率
df['is_error'] = df['tags'].apply(lambda x: x.get('error', False))
error_rate = df.resample(f'{window}s')['is_error'].mean()
# 检测异常
anomalies = error_rate[error_rate > threshold]
return anomalies
调用链分析¶
调用深度分析¶
def analyze_call_depth(trace):
"""分析调用深度"""
def get_depth(span, depth=0):
depths = [depth]
for child in span.get('children', []):
depths.extend(get_depth(child, depth + 1))
return depths
depths = get_depth(trace['root_span'])
return {
'max_depth': max(depths),
'avg_depth': sum(depths) / len(depths),
'depth_distribution': Counter(depths)
}
调用频率分析¶
def analyze_call_frequency(traces):
"""分析调用频率"""
call_counts = Counter()
for trace in traces:
for span in trace['spans']:
operation = span['operation_name']
call_counts[operation] += 1
return call_counts.most_common(20)
性能优化建议¶
基于追踪的优化¶
def generate_optimization_suggestions(trace):
"""生成优化建议"""
suggestions = []
for span in trace['spans']:
# 检测慢数据库查询
if span.get('db_statement') and span['duration'] > 100000000:
suggestions.append({
'type': 'slow_query',
'span': span['span_id'],
'suggestion': f"优化数据库查询: {span['db_statement'][:100]}"
})
# 检测重复调用
if span.get('is_duplicate'):
suggestions.append({
'type': 'duplicate_call',
'span': span['span_id'],
'suggestion': '消除重复调用,使用缓存'
})
# 检测串行调用可并行化
if span.get('can_parallelize'):
suggestions.append({
'type': 'parallelize',
'span': span['span_id'],
'suggestion': '将串行调用改为并行'
})
return suggestions
可视化分析¶
火焰图¶
def generate_flame_graph_data(trace):
"""生成火焰图数据"""
def process_span(span, start_time=0):
data = {
'name': span['operation_name'],
'value': span['duration'] / 1000000, # 转换为毫秒
'start': start_time,
'children': []
}
current_time = start_time
for child in span.get('children', []):
child_data = process_span(child, current_time)
data['children'].append(child_data)
current_time += child['duration'] / 1000000
return data
return process_span(trace['root_span'])
Gantt 图¶
def generate_gantt_data(trace):
"""生成甘特图数据"""
data = []
for span in trace['spans']:
data.append({
'task': span['operation_name'],
'start': span['start_time'],
'end': span['start_time'] + span['duration'],
'service': span['service_name'],
'status': 'error' if span.get('error') else 'ok'
})
return data
小结¶
追踪分析要点:
- 依赖分析:服务调用关系
- 性能瓶颈:慢 Span 识别
- 错误分析:错误聚类和定位
- 延迟分析:分布和异常检测
- 优化建议:数据驱动优化
下一章我们将学习性能优化。