跳转至

第七章:服务容错与降级

7.1 容错设计原则

微服务架构中,服务故障是常态。需要设计容错机制,确保部分服务故障不会导致整个系统崩溃。

故障类型

故障类型:
├── 网络故障
│   ├── 超时
│   ├── 连接失败
│   └── 网络分区
├── 服务故障
│   ├── 服务宕机
│   ├── 响应缓慢
│   └── 资源耗尽
└── 业务故障
    ├── 业务异常
    └── 数据错误

容错模式

容错设计模式:
├── 超时(Timeout)
│   └── 设置合理的超时时间
├── 重试(Retry)
│   └── 失败后自动重试
├── 熔断(Circuit Breaker)
│   └── 快速失败,防止级联故障
├── 降级(Fallback)
│   └── 返回备选响应
├── 限流(Rate Limiting)
│   └── 保护系统不被压垮
└── 隔离(Isolation)
    └── 隔离故障影响范围

7.2 超时策略

超时设置原则

# 超时类型
CONNECT_TIMEOUT = 5.0    # 连接超时
READ_TIMEOUT = 10.0      # 读取超时
WRITE_TIMEOUT = 10.0     # 写入超时
TOTAL_TIMEOUT = 30.0     # 总超时

# 根据服务特性设置
SERVICE_TIMEOUTS = {
    "user-service": {"connect": 3, "read": 5},
    "order-service": {"connect": 3, "read": 10},
    "payment-service": {"connect": 5, "read": 30},  # 支付需要更长超时
    "recommendation-service": {"connect": 2, "read": 3},  # 推荐可以快速失败
}

超时实现

import httpx
from datetime import timedelta

class TimeoutClient:
    """带超时的 HTTP 客户端"""

    def __init__(self, service_name: str):
        self.service_name = service_name
        timeout_config = SERVICE_TIMEOUTS.get(service_name, {"connect": 5, "read": 10})

        self.client = httpx.AsyncClient(
            timeout=httpx.Timeout(
                connect=timeout_config["connect"],
                read=timeout_config["read"]
            )
        )

    async def get(self, url: str, **kwargs):
        try:
            response = await self.client.get(url, **kwargs)
            return response
        except httpx.TimeoutException as e:
            raise ServiceTimeoutError(
                service=self.service_name,
                timeout=e.request.timeout
            )

class ServiceTimeoutError(Exception):
    def __init__(self, service: str, timeout: float):
        self.service = service
        self.timeout = timeout
        super().__init__(f"服务 {service} 超时 ({timeout}s)")

7.3 重试策略

重试原则

重试场景:
✅ 适合重试:
   - 网络抖动导致的临时故障
   - 服务短暂不可用
   - 幂等的读操作

❌ 不适合重试:
   - 非幂等的写操作
   - 业务逻辑错误
   - 认证授权失败

重试实现

import asyncio
import random
from functools import wraps
from typing import Callable, Type, Tuple

def retry(
    max_attempts: int = 3,
    wait_strategy: str = "exponential",
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """重试装饰器"""

    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_attempts - 1:
                        raise

                    # 计算等待时间
                    if wait_strategy == "exponential":
                        delay = min(base_delay * (2 ** attempt), max_delay)
                    elif wait_strategy == "linear":
                        delay = min(base_delay * (attempt + 1), max_delay)
                    else:
                        delay = base_delay

                    # 添加随机抖动
                    delay = delay * (0.5 + random.random())

                    print(f"重试 {attempt + 1}/{max_attempts}, 等待 {delay:.2f}s")
                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper

    return decorator

# 使用示例
@retry(max_attempts=3, wait_strategy="exponential", exceptions=(httpx.TimeoutException,))
async def call_user_service(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service/users/{user_id}")
        response.raise_for_status()
        return response.json()

重试风暴防护

class RetryBudget:
    """重试预算 - 防止重试风暴"""

    def __init__(self, max_retries_per_second: int = 10):
        self.max_retries = max_retries_per_second
        self.retry_count = 0
        self.last_reset = time.time()

    def can_retry(self) -> bool:
        """检查是否允许重试"""
        now = time.time()

        # 每秒重置计数
        if now - self.last_reset >= 1.0:
            self.retry_count = 0
            self.last_reset = now

        if self.retry_count >= self.max_retries:
            return False

        self.retry_count += 1
        return True

7.4 熔断器

熔断器状态

熔断器状态机:

         失败率超阈值
    ┌──────────────────────┐
    │                      │
    ▼                      │
┌─────────┐  半开成功  ┌─────────┐
│  关闭   │ ←───────── │  半开   │
│ (Closed)│            │(Half-Open)│
└─────────┘ ─────────→ └─────────┘
    │   失败率超阈值      ▲
    │                     │
    │    ┌────────────────┘
    │    │  半开失败
    ▼    │
┌─────────┐
│  打开   │
│ (Open)  │
└─────────┘
    │ 超时后进入半开
    └──────────────────────┘

熔断器实现

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"      # 关闭(正常)
    OPEN = "open"          # 打开(熔断)
    HALF_OPEN = "half_open"  # 半开(试探)

@dataclass
class CircuitStats:
    """熔断器统计"""
    total_calls: int = 0
    failed_calls: int = 0
    last_failure_time: float = 0

class CircuitBreaker:
    """熔断器"""

    def __init__(
        self,
        failure_threshold: int = 5,
        failure_rate_threshold: float = 0.5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.failure_rate_threshold = failure_rate_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = CircuitState.CLOSED
        self.stats = CircuitStats()
        self.half_open_calls = 0
        self._lock = asyncio.Lock()

    async def call(self, func: Callable, *args, **kwargs):
        """通过熔断器调用函数"""
        async with self._lock:
            # 检查是否可以调用
            if self.state == CircuitState.OPEN:
                if self._should_attempt_recovery():
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                else:
                    raise CircuitBreakerOpen("熔断器已打开")

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except Exception as e:
            await self._on_failure()
            raise

    def _should_attempt_recovery(self) -> bool:
        """检查是否应该尝试恢复"""
        return (
            self.stats.last_failure_time is not None and
            time.time() - self.stats.last_failure_time >= self.recovery_timeout
        )

    async def _on_success(self):
        """调用成功"""
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_calls += 1
                if self.half_open_calls >= self.half_open_max_calls:
                    # 半开状态下连续成功,关闭熔断器
                    self._reset()

    async def _on_failure(self):
        """调用失败"""
        async with self._lock:
            self.stats.total_calls += 1
            self.stats.failed_calls += 1
            self.stats.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                # 半开状态下失败,立即打开
                self.state = CircuitState.OPEN
            elif self._should_open():
                self.state = CircuitState.OPEN

    def _should_open(self) -> bool:
        """检查是否应该打开熔断器"""
        if self.stats.failed_calls >= self.failure_threshold:
            failure_rate = self.stats.failed_calls / self.stats.total_calls
            return failure_rate >= self.failure_rate_threshold
        return False

    def _reset(self):
        """重置熔断器"""
        self.state = CircuitState.CLOSED
        self.stats = CircuitStats()
        self.half_open_calls = 0

class CircuitBreakerOpen(Exception):
    """熔断器打开异常"""
    pass

# 使用示例
payment_breaker = CircuitBreaker(
    failure_threshold=5,
    failure_rate_threshold=0.5,
    recovery_timeout=30
)

async def call_payment_service(order_id: str):
    """调用支付服务(带熔断保护)"""
    return await payment_breaker.call(
        _do_payment_call,
        order_id
    )

async def _do_payment_call(order_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://payment-service/pay",
            json={"order_id": order_id}
        )
        response.raise_for_status()
        return response.json()

7.5 降级策略

降级类型

降级策略:
├── 自动降级
│   ├── 熔断降级:熔断器打开时自动降级
│   ├── 超时降级:超时后返回默认值
│   └── 限流降级:超过限流阈值时降级
└── 手动降级
    ├── 功能开关:通过配置关闭非核心功能
    └── 服务降级:人工关闭部分服务

降级实现

from typing import Any, Callable, Optional
from functools import wraps

class FallbackStrategy:
    """降级策略"""

    @staticmethod
    def default_value(value: Any):
        """返回默认值"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    print(f"服务降级: {func.__name__}, 返回默认值")
                    return value
            return wrapper
        return decorator

    @staticmethod
    def from_cache(cache_key: str):
        """从缓存获取"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    # 从缓存获取
                    cached = await cache.get(cache_key)
                    if cached:
                        print(f"服务降级: {func.__name__}, 返回缓存值")
                        return cached
                    raise
            return wrapper
        return decorator

    @staticmethod
    def call_alternative(backup_func: Callable):
        """调用备选服务"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    print(f"服务降级: {func.__name__}, 调用备选服务")
                    return await backup_func(*args, **kwargs)
            return wrapper
        return decorator

# 使用示例
@FallbackStrategy.default_value({"recommendations": []})
async def get_recommendations(user_id: str):
    """获取推荐(降级返回空列表)"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://rec-service/recommend/{user_id}")
        return response.json()

@FallbackStrategy.from_cache("user:{user_id}")
async def get_user_profile(user_id: str):
    """获取用户信息(降级返回缓存)"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service/profile/{user_id}")
        return response.json()

async def get_product_from_db(product_id: str):
    """从数据库获取商品(备选方案)"""
    # ...

@FallbackStrategy.call_alternative(get_product_from_db)
async def get_product_from_es(product_id: str):
    """从 ES 获取商品(主方案)"""
    # ...

功能开关降级

from dataclasses import dataclass
from typing import Dict

@dataclass
class FeatureFlag:
    name: str
    enabled: bool
    fallback_value: Any = None

class FeatureFlagManager:
    """功能开关管理"""

    def __init__(self):
        self.flags: Dict[str, FeatureFlag] = {}

    def register(self, name: str, enabled: bool = True, fallback: Any = None):
        """注册功能开关"""
        self.flags[name] = FeatureFlag(name, enabled, fallback)

    def is_enabled(self, name: str) -> bool:
        """检查功能是否启用"""
        flag = self.flags.get(name)
        return flag.enabled if flag else False

    def set_enabled(self, name: str, enabled: bool):
        """设置功能状态"""
        if name in self.flags:
            self.flags[name].enabled = enabled

    async def execute_with_flag(
        self,
        flag_name: str,
        func: Callable,
        fallback: Callable = None
    ):
        """根据功能开关执行"""
        if self.is_enabled(flag_name):
            return await func()
        elif fallback:
            return await fallback()
        return None

# 使用示例
flag_manager = FeatureFlagManager()
flag_manager.register("recommendation", enabled=True)
flag_manager.register("advanced_search", enabled=False)

# 动态降级
flag_manager.set_enabled("recommendation", False)  # 关闭推荐功能

7.6 隔离策略

线程池隔离

import asyncio
from concurrent.futures import ThreadPoolExecutor

class IsolatedExecutor:
    """隔离执行器"""

    def __init__(self, max_workers: int = 10):
        self.executors = {}
        self.max_workers = max_workers

    def get_executor(self, service_name: str) -> ThreadPoolExecutor:
        """获取服务的独立线程池"""
        if service_name not in self.executors:
            self.executors[service_name] = ThreadPoolExecutor(
                max_workers=self.max_workers,
                thread_name_prefix=f"{service_name}-"
            )
        return self.executors[service_name]

    async def execute(self, service_name: str, func: Callable, *args, **kwargs):
        """在隔离的线程池中执行"""
        executor = self.get_executor(service_name)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            executor,
            lambda: func(*args, **kwargs)
        )

# 使用示例
executor = IsolatedExecutor(max_workers=5)

# 不同服务使用不同的线程池
await executor.execute("payment", process_payment, order_id)
await executor.execute("notification", send_notification, user_id)

舱壁模式

class Bulkhead:
    """舱壁隔离"""

    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.current_calls = 0

    async def execute(self, func: Callable, *args, **kwargs):
        """在舱壁限制下执行"""
        async with self.semaphore:
            self.current_calls += 1
            try:
                return await func(*args, **kwargs)
            finally:
                self.current_calls -= 1

# 不同服务使用不同的舱壁
payment_bulkhead = Bulkhead(max_concurrent=5)
notification_bulkhead = Bulkhead(max_concurrent=20)

async def call_payment(order_id):
    return await payment_bulkhead.execute(_do_payment, order_id)

7.7 容错配置最佳实践

# 容错配置示例
services:
  user-service:
    timeout:
      connect: 3s
      read: 5s
    retry:
      max_attempts: 3
      wait_strategy: exponential
    circuit_breaker:
      failure_threshold: 5
      failure_rate_threshold: 0.5
      recovery_timeout: 30s
    fallback:
      enabled: true
      strategy: cache

  payment-service:
    timeout:
      connect: 5s
      read: 30s
    retry:
      max_attempts: 2
      wait_strategy: fixed
    circuit_breaker:
      failure_threshold: 3
      failure_rate_threshold: 0.3
      recovery_timeout: 60s
    fallback:
      enabled: false  # 支付不降级

  recommendation-service:
    timeout:
      connect: 2s
      read: 3s
    retry:
      max_attempts: 1
    circuit_breaker:
      failure_threshold: 10
      failure_rate_threshold: 0.8
      recovery_timeout: 10s
    fallback:
      enabled: true
      strategy: default_value
      default: []

7.8 小结

本章介绍了服务容错与降级的策略:

  1. 超时是第一道防线,需要合理设置
  2. 重试需要考虑幂等性和重试风暴
  3. 熔断器防止级联故障
  4. 降级保证核心功能可用
  5. 隔离限制故障影响范围

思考题

  1. 你的系统如何处理服务故障?
  2. 哪些服务适合降级,哪些不适合?
  3. 如何测试容错机制的有效性?

参考资料