跳转至

第五章:追踪分析

本章介绍如何分析分布式追踪数据,定位性能瓶颈和故障。

追踪数据分析

服务依赖分析

通过追踪数据构建服务依赖图:

┌─────────────────────────────────────────────────────────────┐
│                    服务依赖分析                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  服务调用关系:                                              │
│                                                             │
│  API Gateway                                                │
│       │                                                     │
│       ├──► User Service (1000 calls/min, 45ms avg)         │
│       │         │                                           │
│       │         ├──► MySQL (500 calls/min, 10ms avg)       │
│       │         └──► Redis (800 calls/min, 2ms avg)        │
│       │                                                     │
│       └──► Order Service (500 calls/min, 80ms avg)         │
│                 │                                           │
│                 ├──► Payment Service (400 calls/min)        │
│                 │         │                                 │
│                 │         └──► Stripe API (380 calls/min)   │
│                 │                                           │
│                 └──► Inventory Service (450 calls/min)      │
│                                                             │
│  关键发现:                                                  │
│  - Order Service 响应时间最长 (80ms)                        │
│  - Payment Service 调用失败率最高 (5%)                      │
│  - Redis 缓存命中率 80%                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

性能瓶颈识别

// 查询慢 Span
GET jaeger-span-*/_search
{
  "query": {
    "bool": {
      "filter": [
        { "range": { "duration": { "gte": 100000000 } } }  // > 100ms
      ]
    }
  },
  "aggs": {
    "by_operation": {
      "terms": {
        "field": "operationName.keyword",
        "size": 20
      },
      "aggs": {
        "avg_duration": {
          "avg": { "field": "duration" }
        }
      }
    }
  }
}

错误分析

// 查询错误 Span
GET jaeger-span-*/_search
{
  "query": {
    "bool": {
      "filter": [
        { "term": { "tags.error": "true" } }
      ]
    }
  },
  "aggs": {
    "by_service": {
      "terms": {
        "field": "process.serviceName.keyword"
      },
      "aggs": {
        "by_error": {
          "terms": {
            "field": "tags.error_type.keyword"
          }
        }
      }
    }
  }
}

延迟分析

延迟分布

import pandas as pd
import numpy as np

def analyze_latency(spans):
    df = pd.DataFrame(spans)

    # 计算延迟分布
    latency_stats = df.groupby('operation_name')['duration'].agg([
        ('count', 'count'),
        ('mean', 'mean'),
        ('median', 'median'),
        ('p95', lambda x: np.percentile(x, 95)),
        ('p99', lambda x: np.percentile(x, 99)),
        ('max', 'max')
    ])

    return latency_stats

关键路径分析

┌─────────────────────────────────────────────────────────────┐
│                    关键路径分析                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  请求总耗时: 150ms                                          │
│                                                             │
│  关键路径(串行耗时):                                       │
│  ┌───────────────────────────────────────────────────────┐ │
│  │ API Gateway (10ms)                                    │ │
│  │    ▼                                                  │ │
│  │ User Service (30ms)                                   │ │
│  │    ▼                                                  │ │
│  │ Order Service (60ms) ← 瓶颈                          │ │
│  │    ▼                                                  │ │
│  │ Payment Service (35ms)                                │ │
│  │    ▼                                                  │ │
│  │ Response (5ms)                                        │ │
│  └───────────────────────────────────────────────────────┘ │
│                                                             │
│  优化建议:                                                  │
│  1. Order Service 并行处理子任务                           │
│  2. Payment Service 使用异步调用                           │
│  3. 添加缓存减少数据库查询                                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

异常检测

延迟异常

def detect_latency_anomaly(spans, threshold=2.0):
    """检测延迟异常"""
    df = pd.DataFrame(spans)

    # 计算移动平均和标准差
    df['rolling_mean'] = df['duration'].rolling(window=100).mean()
    df['rolling_std'] = df['duration'].rolling(window=100).std()

    # 检测异常
    df['is_anomaly'] = (
        df['duration'] > df['rolling_mean'] + threshold * df['rolling_std']
    )

    return df[df['is_anomaly']]

错误率异常

def detect_error_rate_anomaly(spans, window=60, threshold=0.05):
    """检测错误率异常"""
    df = pd.DataFrame(spans)

    # 按时间窗口计算错误率
    df['is_error'] = df['tags'].apply(lambda x: x.get('error', False))
    error_rate = df.resample(f'{window}s')['is_error'].mean()

    # 检测异常
    anomalies = error_rate[error_rate > threshold]

    return anomalies

调用链分析

调用深度分析

def analyze_call_depth(trace):
    """分析调用深度"""
    def get_depth(span, depth=0):
        depths = [depth]
        for child in span.get('children', []):
            depths.extend(get_depth(child, depth + 1))
        return depths

    depths = get_depth(trace['root_span'])

    return {
        'max_depth': max(depths),
        'avg_depth': sum(depths) / len(depths),
        'depth_distribution': Counter(depths)
    }

调用频率分析

def analyze_call_frequency(traces):
    """分析调用频率"""
    call_counts = Counter()

    for trace in traces:
        for span in trace['spans']:
            operation = span['operation_name']
            call_counts[operation] += 1

    return call_counts.most_common(20)

性能优化建议

基于追踪的优化

def generate_optimization_suggestions(trace):
    """生成优化建议"""
    suggestions = []

    for span in trace['spans']:
        # 检测慢数据库查询
        if span.get('db_statement') and span['duration'] > 100000000:
            suggestions.append({
                'type': 'slow_query',
                'span': span['span_id'],
                'suggestion': f"优化数据库查询: {span['db_statement'][:100]}"
            })

        # 检测重复调用
        if span.get('is_duplicate'):
            suggestions.append({
                'type': 'duplicate_call',
                'span': span['span_id'],
                'suggestion': '消除重复调用,使用缓存'
            })

        # 检测串行调用可并行化
        if span.get('can_parallelize'):
            suggestions.append({
                'type': 'parallelize',
                'span': span['span_id'],
                'suggestion': '将串行调用改为并行'
            })

    return suggestions

可视化分析

火焰图

def generate_flame_graph_data(trace):
    """生成火焰图数据"""
    def process_span(span, start_time=0):
        data = {
            'name': span['operation_name'],
            'value': span['duration'] / 1000000,  # 转换为毫秒
            'start': start_time,
            'children': []
        }

        current_time = start_time
        for child in span.get('children', []):
            child_data = process_span(child, current_time)
            data['children'].append(child_data)
            current_time += child['duration'] / 1000000

        return data

    return process_span(trace['root_span'])

Gantt 图

def generate_gantt_data(trace):
    """生成甘特图数据"""
    data = []

    for span in trace['spans']:
        data.append({
            'task': span['operation_name'],
            'start': span['start_time'],
            'end': span['start_time'] + span['duration'],
            'service': span['service_name'],
            'status': 'error' if span.get('error') else 'ok'
        })

    return data

小结

追踪分析要点:

  • 依赖分析:服务调用关系
  • 性能瓶颈:慢 Span 识别
  • 错误分析:错误聚类和定位
  • 延迟分析:分布和异常检测
  • 优化建议:数据驱动优化

下一章我们将学习性能优化。