跳转至

第二章:CAP 定理与 BASE 理论

2.1 CAP 定理

CAP 定理是分布式系统设计的基础理论,由 Eric Brewer 在 2000 年提出。

定义

CAP 定理:
一个分布式系统最多只能同时满足以下三项中的两项:

C - Consistency(一致性)
    所有节点在同一时刻看到的数据是一致的

A - Availability(可用性)
    每个请求都能在合理时间内得到响应(成功或失败)

P - Partition Tolerance(分区容错)
    网络分区发生时,系统仍能继续运行

三者详解

Consistency(一致性):

┌─────────┐     写入 X=1      ┌─────────┐
│  节点 A  │ ───────────────→ │  节点 B  │
│  X = 1  │                   │  X = ?  │
└─────────┘                   └─────────┘
     │                              │
     │ 读取 X                       │ 读取 X
     ▼                              ▼
  返回 1                         必须返回 1

一致性保证:
- 写操作完成后,所有后续读操作都能读到最新值
- 所有节点数据同步完成后,写操作才返回成功

Availability(可用性):

┌─────────┐     请求           ┌─────────┐
│  节点 A  │ ───────────────→ │  节点 B  │
│  (正常)  │                   │  (故障)  │
└─────────┘                   └─────────┘
     │                              │
     │ 响应                         │ 无响应
     ▼                              ▼
  返回结果                      系统仍可用

可用性保证:
- 每个请求都能得到响应
- 不保证返回最新数据
- 部分节点故障不影响整体

Partition Tolerance(分区容错):

┌─────────┐                     ┌─────────┐
│  节点 A  │      网络分区       │  节点 B  │
│  X = 1  │ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ │  X = 2  │
└─────────┘                     └─────────┘
     │                              │
     │ 无法通信                      │ 无法通信
     ▼                              ▼
  系统仍能运行                  系统仍能运行

分区容错保证:
- 网络分区是必然发生的
- 系统必须能够处理分区情况
- 分区时系统不能停止服务

CAP 权衡

CAP 组合分析:

CP 系统(一致性 + 分区容错):
┌─────────┐                     ┌─────────┐
│  节点 A  │      网络分区       │  节点 B  │
│  X = 1  │ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ │  X = ?  │
└─────────┘                     └─────────┘
     │                              │
     │ 可读可写                      │ 拒绝服务
     ▼                              ▼
  正常运行                      不可用

代表系统:Zookeeper、Etcd、HBase、Redis(Sentinel)
特点:数据一致,但可能不可用

AP 系统(可用性 + 分区容错):
┌─────────┐                     ┌─────────┐
│  节点 A  │      网络分区       │  节点 B  │
│  X = 1  │ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ ╳ │  X = 2  │
└─────────┘                     └─────────┘
     │                              │
     │ 可读可写                      │ 可读可写
     ▼                              ▼
  返回 X=1                       返回 X=2

代表系统:Cassandra、DynamoDB、Eureka、Nacos(AP 模式)
特点:系统可用,但数据可能不一致

CA 系统(一致性 + 可用性):
- 不存在于真正的分布式系统中
- 单机数据库:MySQL、PostgreSQL
- 分布式系统必须考虑网络分区

CAP 实践选择

# CP 系统示例:Zookeeper
class CPSystem:
    """CP 系统:优先保证一致性"""

    def __init__(self):
        self.data = {}
        self.leader = None
        self.followers = []

    async def write(self, key: str, value: str):
        """写入数据(需要多数节点确认)"""
        # 1. 只有 leader 可以处理写请求
        if not self.is_leader():
            raise NotLeaderException()

        # 2. 向所有 follower 发送写请求
        responses = await self.broadcast_write(key, value)

        # 3. 等待多数节点确认
        if self.has_quorum(responses):
            self.data[key] = value
            return "success"
        else:
            # 无法获得多数确认,写入失败
            raise UnavailableException("无法获得多数确认")

    async def read(self, key: str):
        """读取数据"""
        # CP 系统保证读到最新数据
        if key not in self.data:
            raise NotFoundException()
        return self.data[key]

# AP 系统示例:Cassandra
class APSystem:
    """AP 系统:优先保证可用性"""

    def __init__(self):
        self.nodes = []

    async def write(self, key: str, value: str, consistency_level: str = "ONE"):
        """写入数据"""
        # 写入任意可用节点
        success_count = 0
        for node in self.nodes:
            try:
                await node.write(key, value)
                success_count += 1
                if consistency_level == "ONE" and success_count >= 1:
                    return "success"
            except Exception:
                continue  # 忽略失败节点

        if success_count > 0:
            return "success"
        raise UnavailableException("所有节点不可用")

    async def read(self, key: str):
        """读取数据"""
        # 从任意可用节点读取
        for node in self.nodes:
            try:
                return await node.read(key)
            except Exception:
                continue

        raise UnavailableException("所有节点不可用")

2.2 BASE 理论

BASE 理论是对 CAP 定理的补充,提供了更实际的分布式系统设计思路。

定义

BASE 理论:

BA - Basically Available(基本可用)
     系统出现故障时,允许损失部分可用性
     例如:响应时间增加、功能降级

S - Soft State(软状态)
    允许系统存在中间状态
    中间状态不影响系统可用性
    例如:数据复制过程中的不一致

E - Eventually Consistent(最终一致性)
    经过一段时间后,所有副本最终达到一致
    不要求实时一致
    例如:DNS 缓存更新

最终一致性详解

最终一致性的时间线:

时刻 T0: 写入 X=1 到节点 A
┌─────────┐                     ┌─────────┐
│  节点 A  │                     │  节点 B  │
│  X = 1  │                     │  X = 0  │ ← 旧值
└─────────┘                     └─────────┘

时刻 T1: 数据同步中
┌─────────┐                     ┌─────────┐
│  节点 A  │  ──── 同步中 ────→ │  节点 B  │
│  X = 1  │                     │  X = ?  │
└─────────┘                     └─────────┘

时刻 T2: 同步完成
┌─────────┐                     ┌─────────┐
│  节点 A  │                     │  节点 B  │
│  X = 1  │                     │  X = 1  │ ← 最终一致
└─────────┘                     └─────────┘

最终一致性保证:
- 存在不一致窗口期
- 最终所有副本会一致
- 不一致窗口取决于:延迟、负载、复制策略

一致性级别

一致性级别(从强到弱):

1. 强一致性(Strong Consistency)
   - 写操作完成后,立即对所有读操作可见
   - 实现成本高,性能影响大
   - 例:银行转账

2. 顺序一致性(Sequential Consistency)
   - 所有进程看到相同的操作顺序
   - 不保证实时可见
   - 例:分布式锁

3. 因果一致性(Causal Consistency)
   - 有因果关系的操作必须顺序一致
   - 无因果关系的操作可以乱序
   - 例:评论系统

4. 最终一致性(Eventual Consistency)
   - 最终所有副本一致
   - 不保证何时一致
   - 例:DNS、社交媒体

5. 弱一致性(Weak Consistency)
   - 不保证任何一致性
   - 例:本地缓存

一致性级别实现

from enum import Enum
from typing import List, Optional
import asyncio

class ConsistencyLevel(Enum):
    """一致性级别"""
    STRONG = "strong"          # 强一致性
    EVENTUAL = "eventual"      # 最终一致性
    QUORUM = "quorum"          # 法定人数一致性

class DistributedDataStore:
    """分布式数据存储"""

    def __init__(self, nodes: List['DataNode']):
        self.nodes = nodes
        self.replication_factor = len(nodes)

    async def write(
        self,
        key: str,
        value: str,
        consistency: ConsistencyLevel = ConsistencyLevel.EVENTUAL
    ):
        """写入数据"""
        if consistency == ConsistencyLevel.STRONG:
            # 强一致性:同步写入所有节点
            tasks = [node.write(key, value) for node in self.nodes]
            await asyncio.gather(*tasks)
            return True

        elif consistency == ConsistencyLevel.QUORUM:
            # 法定人数一致性:写入多数节点
            quorum = self.replication_factor // 2 + 1
            success_count = 0

            for node in self.nodes:
                try:
                    await node.write(key, value)
                    success_count += 1
                    if success_count >= quorum:
                        return True
                except Exception:
                    continue

            raise WriteFailedException("无法达到法定人数")

        else:
            # 最终一致性:异步复制
            # 先写入一个节点,然后异步复制
            await self.nodes[0].write(key, value)
            asyncio.create_task(self._replicate(key, value))
            return True

    async def read(
        self,
        key: str,
        consistency: ConsistencyLevel = ConsistencyLevel.EVENTUAL
    ):
        """读取数据"""
        if consistency == ConsistencyLevel.STRONG:
            # 强一致性:读取所有节点,返回最新值
            tasks = [node.read(key) for node in self.nodes]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            # 返回时间戳最新的值
            return self._get_latest_value(results)

        elif consistency == ConsistencyLevel.QUORUM:
            # 法定人数一致性:读取多数节点
            quorum = self.replication_factor // 2 + 1
            results = []

            for node in self.nodes:
                try:
                    value = await node.read(key)
                    results.append(value)
                    if len(results) >= quorum:
                        return self._get_latest_value(results)
                except Exception:
                    continue

            raise ReadFailedException("无法达到法定人数")

        else:
            # 最终一致性:读取任意节点
            for node in self.nodes:
                try:
                    return await node.read(key)
                except Exception:
                    continue

            raise NotFoundException(key)

    async def _replicate(self, key: str, value: str):
        """异步复制"""
        for node in self.nodes[1:]:
            try:
                await node.write(key, value)
            except Exception as e:
                print(f"复制失败: {node}, error: {e}")

2.3 CAP 与 BASE 的关系

CAP 与 BASE 的关系:

CAP 定理:
- 理论指导
- 三选二的硬性约束
- 强调极端情况下的取舍

BASE 理论:
- 实践指导
- CAP 中 AP 方案的延伸
- 强调实际可用的中间状态

关系图:

                    CAP 定理
           ┌──────────┼──────────┐
           │          │          │
          CP         CA         AP
           │          │          │
           │     (不存在于      │
           │      分布式)       │
           │                    │
           ▼                    ▼
      强一致性              BASE 理论
      (实时一致)               │
                    ┌─────────┼─────────┐
                    │         │         │
                基本可用   软状态   最终一致

2.4 实际应用场景

场景选择

场景与一致性选择:

强一致性场景:
├── 金融交易
│   └── 转账、支付
├── 库存管理
│   └── 扣减库存
├── 订单系统
│   └── 订单状态
└── 分布式锁
    └── 互斥访问

最终一致性场景:
├── 社交媒体
│   ├── 点赞数
│   ├── 评论数
│   └── 关注关系
├── 内容分发
│   ├── CDN 缓存
│   └── DNS 解析
├── 日志系统
│   └── 日志收集
└── 推荐系统
    └── 用户画像

系统设计示例

# 订单系统:强一致性
class OrderService:
    """订单服务 - 需要强一致性"""

    def __init__(self, db, inventory_client, payment_client):
        self.db = db
        self.inventory = inventory_client
        self.payment = payment_client

    async def create_order(self, order_data: dict):
        """创建订单"""
        # 使用分布式事务保证一致性
        async with self.db.transaction():
            # 1. 创建订单
            order = await self.db.orders.create(order_data)

            # 2. 扣减库存(强一致性)
            await self.inventory.deduct(
                product_id=order.product_id,
                quantity=order.quantity,
                consistency="strong"
            )

            # 3. 处理支付
            await self.payment.process(order.id)

            return order

# 社交系统:最终一致性
class SocialService:
    """社交服务 - 可以最终一致性"""

    def __init__(self, db, message_queue):
        self.db = db
        self.mq = message_queue

    async def like_post(self, user_id: str, post_id: str):
        """点赞"""
        # 1. 直接写入本地数据库
        await self.db.likes.create(user_id, post_id)

        # 2. 异步更新点赞数(最终一致性)
        await self.mq.publish("post.liked", {
            "post_id": post_id,
            "user_id": user_id
        })

        return {"status": "liked"}

    async def get_like_count(self, post_id: str):
        """获取点赞数(可能不是最新)"""
        # 从缓存读取,可能存在延迟
        count = await self.db.cache.get(f"post:{post_id}:likes")
        if count is None:
            count = await self.db.likes.count(post_id)
            await self.db.cache.set(f"post:{post_id}:likes", count, ttl=60)
        return count

2.5 小结

本章介绍了分布式系统的核心理论:

  1. CAP 定理:一致性、可用性、分区容错三选二
  2. CP 系统优先一致性,AP 系统优先可用性
  3. BASE 理论是 CAP 的实践补充
  4. 最终一致性是大多数场景的选择
  5. 根据业务场景选择合适的一致性级别

思考题

  1. 为什么分布式系统必须考虑分区容错?
  2. 你的业务场景需要什么级别的一致性?
  3. 如何在一致性和性能之间取得平衡?

参考资料