第七章:服务容错与降级¶
7.1 容错设计原则¶
微服务架构中,服务故障是常态。需要设计容错机制,确保部分服务故障不会导致整个系统崩溃。
故障类型¶
故障类型:
├── 网络故障
│ ├── 超时
│ ├── 连接失败
│ └── 网络分区
├── 服务故障
│ ├── 服务宕机
│ ├── 响应缓慢
│ └── 资源耗尽
└── 业务故障
├── 业务异常
└── 数据错误
容错模式¶
容错设计模式:
├── 超时(Timeout)
│ └── 设置合理的超时时间
├── 重试(Retry)
│ └── 失败后自动重试
├── 熔断(Circuit Breaker)
│ └── 快速失败,防止级联故障
├── 降级(Fallback)
│ └── 返回备选响应
├── 限流(Rate Limiting)
│ └── 保护系统不被压垮
└── 隔离(Isolation)
└── 隔离故障影响范围
7.2 超时策略¶
超时设置原则¶
# 超时类型
CONNECT_TIMEOUT = 5.0 # 连接超时
READ_TIMEOUT = 10.0 # 读取超时
WRITE_TIMEOUT = 10.0 # 写入超时
TOTAL_TIMEOUT = 30.0 # 总超时
# 根据服务特性设置
SERVICE_TIMEOUTS = {
"user-service": {"connect": 3, "read": 5},
"order-service": {"connect": 3, "read": 10},
"payment-service": {"connect": 5, "read": 30}, # 支付需要更长超时
"recommendation-service": {"connect": 2, "read": 3}, # 推荐可以快速失败
}
超时实现¶
import httpx
from datetime import timedelta
class TimeoutClient:
"""带超时的 HTTP 客户端"""
def __init__(self, service_name: str):
self.service_name = service_name
timeout_config = SERVICE_TIMEOUTS.get(service_name, {"connect": 5, "read": 10})
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=timeout_config["connect"],
read=timeout_config["read"]
)
)
async def get(self, url: str, **kwargs):
try:
response = await self.client.get(url, **kwargs)
return response
except httpx.TimeoutException as e:
raise ServiceTimeoutError(
service=self.service_name,
timeout=e.request.timeout
)
class ServiceTimeoutError(Exception):
def __init__(self, service: str, timeout: float):
self.service = service
self.timeout = timeout
super().__init__(f"服务 {service} 超时 ({timeout}s)")
7.3 重试策略¶
重试原则¶
重试实现¶
import asyncio
import random
from functools import wraps
from typing import Callable, Type, Tuple
def retry(
max_attempts: int = 3,
wait_strategy: str = "exponential",
base_delay: float = 1.0,
max_delay: float = 30.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""重试装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts - 1:
raise
# 计算等待时间
if wait_strategy == "exponential":
delay = min(base_delay * (2 ** attempt), max_delay)
elif wait_strategy == "linear":
delay = min(base_delay * (attempt + 1), max_delay)
else:
delay = base_delay
# 添加随机抖动
delay = delay * (0.5 + random.random())
print(f"重试 {attempt + 1}/{max_attempts}, 等待 {delay:.2f}s")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# 使用示例
@retry(max_attempts=3, wait_strategy="exponential", exceptions=(httpx.TimeoutException,))
async def call_user_service(user_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service/users/{user_id}")
response.raise_for_status()
return response.json()
重试风暴防护¶
class RetryBudget:
"""重试预算 - 防止重试风暴"""
def __init__(self, max_retries_per_second: int = 10):
self.max_retries = max_retries_per_second
self.retry_count = 0
self.last_reset = time.time()
def can_retry(self) -> bool:
"""检查是否允许重试"""
now = time.time()
# 每秒重置计数
if now - self.last_reset >= 1.0:
self.retry_count = 0
self.last_reset = now
if self.retry_count >= self.max_retries:
return False
self.retry_count += 1
return True
7.4 熔断器¶
熔断器状态¶
熔断器状态机:
失败率超阈值
┌──────────────────────┐
│ │
▼ │
┌─────────┐ 半开成功 ┌─────────┐
│ 关闭 │ ←───────── │ 半开 │
│ (Closed)│ │(Half-Open)│
└─────────┘ ─────────→ └─────────┘
│ 失败率超阈值 ▲
│ │
│ ┌────────────────┘
│ │ 半开失败
▼ │
┌─────────┐
│ 打开 │
│ (Open) │
└─────────┘
│
│ 超时后进入半开
└──────────────────────┘
熔断器实现¶
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # 关闭(正常)
OPEN = "open" # 打开(熔断)
HALF_OPEN = "half_open" # 半开(试探)
@dataclass
class CircuitStats:
"""熔断器统计"""
total_calls: int = 0
failed_calls: int = 0
last_failure_time: float = 0
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5,
failure_rate_threshold: float = 0.5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.failure_rate_threshold = failure_rate_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.stats = CircuitStats()
self.half_open_calls = 0
self._lock = asyncio.Lock()
async def call(self, func: Callable, *args, **kwargs):
"""通过熔断器调用函数"""
async with self._lock:
# 检查是否可以调用
if self.state == CircuitState.OPEN:
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitBreakerOpen("熔断器已打开")
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
def _should_attempt_recovery(self) -> bool:
"""检查是否应该尝试恢复"""
return (
self.stats.last_failure_time is not None and
time.time() - self.stats.last_failure_time >= self.recovery_timeout
)
async def _on_success(self):
"""调用成功"""
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
# 半开状态下连续成功,关闭熔断器
self._reset()
async def _on_failure(self):
"""调用失败"""
async with self._lock:
self.stats.total_calls += 1
self.stats.failed_calls += 1
self.stats.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# 半开状态下失败,立即打开
self.state = CircuitState.OPEN
elif self._should_open():
self.state = CircuitState.OPEN
def _should_open(self) -> bool:
"""检查是否应该打开熔断器"""
if self.stats.failed_calls >= self.failure_threshold:
failure_rate = self.stats.failed_calls / self.stats.total_calls
return failure_rate >= self.failure_rate_threshold
return False
def _reset(self):
"""重置熔断器"""
self.state = CircuitState.CLOSED
self.stats = CircuitStats()
self.half_open_calls = 0
class CircuitBreakerOpen(Exception):
"""熔断器打开异常"""
pass
# 使用示例
payment_breaker = CircuitBreaker(
failure_threshold=5,
failure_rate_threshold=0.5,
recovery_timeout=30
)
async def call_payment_service(order_id: str):
"""调用支付服务(带熔断保护)"""
return await payment_breaker.call(
_do_payment_call,
order_id
)
async def _do_payment_call(order_id: str):
async with httpx.AsyncClient() as client:
response = await client.post(
"http://payment-service/pay",
json={"order_id": order_id}
)
response.raise_for_status()
return response.json()
7.5 降级策略¶
降级类型¶
降级策略:
├── 自动降级
│ ├── 熔断降级:熔断器打开时自动降级
│ ├── 超时降级:超时后返回默认值
│ └── 限流降级:超过限流阈值时降级
└── 手动降级
├── 功能开关:通过配置关闭非核心功能
└── 服务降级:人工关闭部分服务
降级实现¶
from typing import Any, Callable, Optional
from functools import wraps
class FallbackStrategy:
"""降级策略"""
@staticmethod
def default_value(value: Any):
"""返回默认值"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
print(f"服务降级: {func.__name__}, 返回默认值")
return value
return wrapper
return decorator
@staticmethod
def from_cache(cache_key: str):
"""从缓存获取"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# 从缓存获取
cached = await cache.get(cache_key)
if cached:
print(f"服务降级: {func.__name__}, 返回缓存值")
return cached
raise
return wrapper
return decorator
@staticmethod
def call_alternative(backup_func: Callable):
"""调用备选服务"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
print(f"服务降级: {func.__name__}, 调用备选服务")
return await backup_func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@FallbackStrategy.default_value({"recommendations": []})
async def get_recommendations(user_id: str):
"""获取推荐(降级返回空列表)"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://rec-service/recommend/{user_id}")
return response.json()
@FallbackStrategy.from_cache("user:{user_id}")
async def get_user_profile(user_id: str):
"""获取用户信息(降级返回缓存)"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service/profile/{user_id}")
return response.json()
async def get_product_from_db(product_id: str):
"""从数据库获取商品(备选方案)"""
# ...
@FallbackStrategy.call_alternative(get_product_from_db)
async def get_product_from_es(product_id: str):
"""从 ES 获取商品(主方案)"""
# ...
功能开关降级¶
from dataclasses import dataclass
from typing import Dict
@dataclass
class FeatureFlag:
name: str
enabled: bool
fallback_value: Any = None
class FeatureFlagManager:
"""功能开关管理"""
def __init__(self):
self.flags: Dict[str, FeatureFlag] = {}
def register(self, name: str, enabled: bool = True, fallback: Any = None):
"""注册功能开关"""
self.flags[name] = FeatureFlag(name, enabled, fallback)
def is_enabled(self, name: str) -> bool:
"""检查功能是否启用"""
flag = self.flags.get(name)
return flag.enabled if flag else False
def set_enabled(self, name: str, enabled: bool):
"""设置功能状态"""
if name in self.flags:
self.flags[name].enabled = enabled
async def execute_with_flag(
self,
flag_name: str,
func: Callable,
fallback: Callable = None
):
"""根据功能开关执行"""
if self.is_enabled(flag_name):
return await func()
elif fallback:
return await fallback()
return None
# 使用示例
flag_manager = FeatureFlagManager()
flag_manager.register("recommendation", enabled=True)
flag_manager.register("advanced_search", enabled=False)
# 动态降级
flag_manager.set_enabled("recommendation", False) # 关闭推荐功能
7.6 隔离策略¶
线程池隔离¶
import asyncio
from concurrent.futures import ThreadPoolExecutor
class IsolatedExecutor:
"""隔离执行器"""
def __init__(self, max_workers: int = 10):
self.executors = {}
self.max_workers = max_workers
def get_executor(self, service_name: str) -> ThreadPoolExecutor:
"""获取服务的独立线程池"""
if service_name not in self.executors:
self.executors[service_name] = ThreadPoolExecutor(
max_workers=self.max_workers,
thread_name_prefix=f"{service_name}-"
)
return self.executors[service_name]
async def execute(self, service_name: str, func: Callable, *args, **kwargs):
"""在隔离的线程池中执行"""
executor = self.get_executor(service_name)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
lambda: func(*args, **kwargs)
)
# 使用示例
executor = IsolatedExecutor(max_workers=5)
# 不同服务使用不同的线程池
await executor.execute("payment", process_payment, order_id)
await executor.execute("notification", send_notification, user_id)
舱壁模式¶
class Bulkhead:
"""舱壁隔离"""
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.current_calls = 0
async def execute(self, func: Callable, *args, **kwargs):
"""在舱壁限制下执行"""
async with self.semaphore:
self.current_calls += 1
try:
return await func(*args, **kwargs)
finally:
self.current_calls -= 1
# 不同服务使用不同的舱壁
payment_bulkhead = Bulkhead(max_concurrent=5)
notification_bulkhead = Bulkhead(max_concurrent=20)
async def call_payment(order_id):
return await payment_bulkhead.execute(_do_payment, order_id)
7.7 容错配置最佳实践¶
# 容错配置示例
services:
user-service:
timeout:
connect: 3s
read: 5s
retry:
max_attempts: 3
wait_strategy: exponential
circuit_breaker:
failure_threshold: 5
failure_rate_threshold: 0.5
recovery_timeout: 30s
fallback:
enabled: true
strategy: cache
payment-service:
timeout:
connect: 5s
read: 30s
retry:
max_attempts: 2
wait_strategy: fixed
circuit_breaker:
failure_threshold: 3
failure_rate_threshold: 0.3
recovery_timeout: 60s
fallback:
enabled: false # 支付不降级
recommendation-service:
timeout:
connect: 2s
read: 3s
retry:
max_attempts: 1
circuit_breaker:
failure_threshold: 10
failure_rate_threshold: 0.8
recovery_timeout: 10s
fallback:
enabled: true
strategy: default_value
default: []
7.8 小结¶
本章介绍了服务容错与降级的策略:
- 超时是第一道防线,需要合理设置
- 重试需要考虑幂等性和重试风暴
- 熔断器防止级联故障
- 降级保证核心功能可用
- 隔离限制故障影响范围
思考题¶
- 你的系统如何处理服务故障?
- 哪些服务适合降级,哪些不适合?
- 如何测试容错机制的有效性?