跳转至

第五章:Watch 机制

Watch 概述

什么是 Watch?

Watch 是 etcd 提供的事件监听机制,当键发生变化时,etcd 会通知客户端。

Client                          etcd Server
  │                                    │
  │──── Watch("/config/app") ─────────►│
  │◄─── Watch Created ────────────────│
  │                                    │
  │         (键被修改)                  │
  │                                    │
  │◄─── Watch Event ──────────────────│
  │     (PUT: /config/app)            │
  │                                    │
  │         (键被删除)                  │
  │                                    │
  │◄─── Watch Event ──────────────────│
  │     (DELETE: /config/app)         │
  │                                    │

Watch 特性

1. 实时通知
   - 键变化时立即通知

2. 历史事件
   - 可以从指定版本开始监听

3. 多键监听
   - 支持前缀监听、范围监听

4. 可靠性
   - 断线重连后自动恢复

命令行 Watch

监听单个键

# 监听键变化
etcdctl watch /config/app

# 在另一个终端修改键
etcdctl put /config/app "new-value"
# 输出:
# PUT
# /config/app
# new-value

监听前缀

# 监听前缀
etcdctl watch /config/ --prefix

# 在另一个终端修改键
etcdctl put /config/database "mysql"
# 输出:
# PUT
# /config/database
# mysql

监听历史

# 从指定版本开始监听
etcdctl watch /config/app --rev=10

Go 客户端 Watch

监听单个键

import (
    "context"
    "fmt"
    clientv3 "go.etcd.io/etcd/client/v3"
)

func watchKey(cli *clientv3.Client, key string) {
    // 创建 Watch
    watchChan := cli.Watch(context.Background(), key)

    // 处理事件
    for resp := range watchChan {
        for _, event := range resp.Events {
            fmt.Printf("Type: %s, Key: %s, Value: %s\n",
                event.Type, event.Kv.Key, event.Kv.Value)
        }
    }
}

监听前缀

func watchPrefix(cli *clientv3.Client, prefix string) {
    // 创建前缀 Watch
    watchChan := cli.Watch(
        context.Background(),
        prefix,
        clientv3.WithPrefix(),
    )

    // 处理事件
    for resp := range watchChan {
        for _, event := range resp.Events {
            fmt.Printf("Type: %s, Key: %s, Value: %s\n",
                event.Type, event.Kv.Key, event.Kv.Value)
        }
    }
}

监听历史

func watchFromRevision(cli *clientv3.Client, key string, revision int64) {
    // 从指定版本开始监听
    watchChan := cli.Watch(
        context.Background(),
        key,
        clientv3.WithRev(revision),
    )

    // 处理事件
    for resp := range watchChan {
        for _, event := range resp.Events {
            fmt.Printf("Type: %s, Key: %s, Value: %s\n",
                event.Type, event.Kv.Key, event.Kv.Value)
        }
    }
}

监听多个键

func watchMultiple(cli *clientv3.Client, keys []string) {
    ctx := context.Background()

    // 创建多个 Watch
    for _, key := range keys {
        go func(k string) {
            watchChan := cli.Watch(ctx, k)
            for resp := range watchChan {
                for _, event := range resp.Events {
                    fmt.Printf("Key: %s, Type: %s\n", k, event.Type)
                }
            }
        }(key)
    }
}

Python 客户端 Watch

监听键

import etcd3

etcd = etcd3.client()

# 监听单个键
events_iterator, cancel = etcd.watch('/config/app')
for event in events_iterator:
    print(f"Type: {event.type}, Key: {event.key.decode()}, Value: {event.value.decode()}")

监听前缀

# 监听前缀
events_iterator, cancel = etcd.watch_prefix('/config/')
for event in events_iterator:
    print(f"Type: {event.type}, Key: {event.key.decode()}, Value: {event.value.decode()}")

取消监听

# 取消监听
cancel()

Watch 最佳实践

1. 处理断线重连

func watchWithRetry(cli *clientv3.Client, key string) {
    for {
        watchChan := cli.Watch(context.Background(), key)

        for resp := range watchChan {
            if resp.Err() != nil {
                // 断线重连
                fmt.Printf("Watch error: %v, reconnecting...\n", resp.Err())
                break
            }

            for _, event := range resp.Events {
                handleEvent(event)
            }
        }
    }
}

2. 处理历史事件

func watchWithHistory(cli *clientv3.Client, key string) {
    // 获取当前版本
    getResp, _ := cli.Get(context.Background(), key)
    revision := getResp.Header.Revision

    // 从当前版本开始监听
    watchChan := cli.Watch(
        context.Background(),
        key,
        clientv3.WithRev(revision),
    )

    for resp := range watchChan {
        for _, event := range resp.Events {
            handleEvent(event)
        }
    }
}

3. 批量处理事件

func watchBatch(cli *clientv3.Client, prefix string) {
    watchChan := cli.Watch(
        context.Background(),
        prefix,
        clientv3.WithPrefix(),
    )

    batch := make([]*clientv3.Event, 0)
    ticker := time.NewTicker(100 * time.Millisecond)

    for {
        select {
        case resp := <-watchChan:
            for _, event := range resp.Events {
                batch = append(batch, event)
            }
        case <-ticker.C:
            if len(batch) > 0 {
                processBatch(batch)
                batch = make([]*clientv3.Event, 0)
            }
        }
    }
}

小结

Watch 机制要点:

  • Watch 特性:实时通知、历史事件、多键监听
  • 命令行 Watch:监听单个键、监听前缀
  • Go 客户端 Watch:监听键、监听前缀、监听历史
  • Python 客户端 Watch:etcd3 库
  • 最佳实践:断线重连、历史事件、批量处理

下一章我们将学习集群部署。