第五章:Watch 机制¶
Watch 概述¶
什么是 Watch?¶
Watch 是 etcd 提供的事件监听机制,当键发生变化时,etcd 会通知客户端。
Client etcd Server
│ │
│──── Watch("/config/app") ─────────►│
│◄─── Watch Created ────────────────│
│ │
│ (键被修改) │
│ │
│◄─── Watch Event ──────────────────│
│ (PUT: /config/app) │
│ │
│ (键被删除) │
│ │
│◄─── Watch Event ──────────────────│
│ (DELETE: /config/app) │
│ │
Watch 特性¶
命令行 Watch¶
监听单个键¶
# 监听键变化
etcdctl watch /config/app
# 在另一个终端修改键
etcdctl put /config/app "new-value"
# 输出:
# PUT
# /config/app
# new-value
监听前缀¶
# 监听前缀
etcdctl watch /config/ --prefix
# 在另一个终端修改键
etcdctl put /config/database "mysql"
# 输出:
# PUT
# /config/database
# mysql
监听历史¶
Go 客户端 Watch¶
监听单个键¶
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
)
func watchKey(cli *clientv3.Client, key string) {
// 创建 Watch
watchChan := cli.Watch(context.Background(), key)
// 处理事件
for resp := range watchChan {
for _, event := range resp.Events {
fmt.Printf("Type: %s, Key: %s, Value: %s\n",
event.Type, event.Kv.Key, event.Kv.Value)
}
}
}
监听前缀¶
func watchPrefix(cli *clientv3.Client, prefix string) {
// 创建前缀 Watch
watchChan := cli.Watch(
context.Background(),
prefix,
clientv3.WithPrefix(),
)
// 处理事件
for resp := range watchChan {
for _, event := range resp.Events {
fmt.Printf("Type: %s, Key: %s, Value: %s\n",
event.Type, event.Kv.Key, event.Kv.Value)
}
}
}
监听历史¶
func watchFromRevision(cli *clientv3.Client, key string, revision int64) {
// 从指定版本开始监听
watchChan := cli.Watch(
context.Background(),
key,
clientv3.WithRev(revision),
)
// 处理事件
for resp := range watchChan {
for _, event := range resp.Events {
fmt.Printf("Type: %s, Key: %s, Value: %s\n",
event.Type, event.Kv.Key, event.Kv.Value)
}
}
}
监听多个键¶
func watchMultiple(cli *clientv3.Client, keys []string) {
ctx := context.Background()
// 创建多个 Watch
for _, key := range keys {
go func(k string) {
watchChan := cli.Watch(ctx, k)
for resp := range watchChan {
for _, event := range resp.Events {
fmt.Printf("Key: %s, Type: %s\n", k, event.Type)
}
}
}(key)
}
}
Python 客户端 Watch¶
监听键¶
import etcd3
etcd = etcd3.client()
# 监听单个键
events_iterator, cancel = etcd.watch('/config/app')
for event in events_iterator:
print(f"Type: {event.type}, Key: {event.key.decode()}, Value: {event.value.decode()}")
监听前缀¶
# 监听前缀
events_iterator, cancel = etcd.watch_prefix('/config/')
for event in events_iterator:
print(f"Type: {event.type}, Key: {event.key.decode()}, Value: {event.value.decode()}")
取消监听¶
Watch 最佳实践¶
1. 处理断线重连¶
func watchWithRetry(cli *clientv3.Client, key string) {
for {
watchChan := cli.Watch(context.Background(), key)
for resp := range watchChan {
if resp.Err() != nil {
// 断线重连
fmt.Printf("Watch error: %v, reconnecting...\n", resp.Err())
break
}
for _, event := range resp.Events {
handleEvent(event)
}
}
}
}
2. 处理历史事件¶
func watchWithHistory(cli *clientv3.Client, key string) {
// 获取当前版本
getResp, _ := cli.Get(context.Background(), key)
revision := getResp.Header.Revision
// 从当前版本开始监听
watchChan := cli.Watch(
context.Background(),
key,
clientv3.WithRev(revision),
)
for resp := range watchChan {
for _, event := range resp.Events {
handleEvent(event)
}
}
}
3. 批量处理事件¶
func watchBatch(cli *clientv3.Client, prefix string) {
watchChan := cli.Watch(
context.Background(),
prefix,
clientv3.WithPrefix(),
)
batch := make([]*clientv3.Event, 0)
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case resp := <-watchChan:
for _, event := range resp.Events {
batch = append(batch, event)
}
case <-ticker.C:
if len(batch) > 0 {
processBatch(batch)
batch = make([]*clientv3.Event, 0)
}
}
}
}
小结¶
Watch 机制要点:
- Watch 特性:实时通知、历史事件、多键监听
- 命令行 Watch:监听单个键、监听前缀
- Go 客户端 Watch:监听键、监听前缀、监听历史
- Python 客户端 Watch:etcd3 库
- 最佳实践:断线重连、历史事件、批量处理
下一章我们将学习集群部署。