第五章:Watch 机制¶
Watch 概述¶
什么是 Watch?¶
Watch 是 ZooKeeper 提供的事件监听机制,当 ZNode 发生变化时,ZooKeeper 会通知客户端。
Client ZooKeeper Server
│ │
│──── getData("/config", watch=true)│
│◄─── data + 注册 Watch ────────────│
│ │
│ (节点数据变化) │
│ │
│◄─── Watch Event ──────────────────│
│ (NodeDataChanged) │
│ │
│──── getData("/config", watch=true)│
│◄─── data + 注册 Watch ────────────│
│ │
Watch 特性¶
1. 一次性触发
- Watch 被触发后,需要重新注册
2. 异步通知
- 通知是异步发送的
3. 顺序保证
- 客户端先收到 Watch 通知,再看到数据变化
4. 轻量级
- 只通知事件类型,不包含变化后的数据
Watch 类型¶
数据 Watch¶
// 监听节点数据变化
zk.getData("/config/app", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
System.out.println("节点数据已变化");
// 重新注册 Watch
try {
byte[] data = zk.getData("/config/app", this, null);
System.out.println("新数据: " + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, null);
子节点 Watch¶
// 监听子节点变化
zk.getChildren("/services", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
System.out.println("子节点已变化");
// 重新注册 Watch
try {
List<String> children = zk.getChildren("/services", this);
System.out.println("子节点: " + children);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
存在性 Watch¶
// 监听节点是否存在
zk.exists("/config/app", new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case NodeCreated:
System.out.println("节点已创建");
break;
case NodeDeleted:
System.out.println("节点已删除");
break;
case NodeDataChanged:
System.out.println("节点数据已变化");
break;
}
}
});
事件类型¶
| 事件类型 | 说明 | 触发方法 |
|---|---|---|
| NodeCreated | 节点创建 | exists() |
| NodeDeleted | 节点删除 | exists(), getData(), getChildren() |
| NodeDataChanged | 数据变化 | exists(), getData() |
| NodeChildrenChanged | 子节点变化 | getChildren() |
Curator Watch¶
NodeCache¶
import org.apache.curator.framework.recipes.cache.NodeCache;
// 创建 NodeCache(监听节点数据变化)
NodeCache nodeCache = new NodeCache(client, "/config/app");
nodeCache.start();
// 添加监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData data = nodeCache.getCurrentData();
if (data != null) {
System.out.println("数据: " + new String(data.getData()));
} else {
System.out.println("节点已删除");
}
}
});
// 关闭
nodeCache.close();
PathChildrenCache¶
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
// 创建 PathChildrenCache(监听子节点变化)
PathChildrenCache childrenCache = new PathChildrenCache(
client,
"/services",
true
);
childrenCache.start();
// 添加监听器
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("子节点添加: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("子节点更新: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("子节点删除: " + event.getData().getPath());
break;
}
}
});
// 关闭
childrenCache.close();
TreeCache¶
import org.apache.curator.framework.recipes.cache.TreeCache;
// 创建 TreeCache(监听节点及子节点)
TreeCache treeCache = new TreeCache(client, "/config");
treeCache.start();
// 添加监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("节点添加: " + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("节点更新: " + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("节点删除: " + event.getData().getPath());
break;
}
}
});
// 关闭
treeCache.close();
Python Watch¶
from kazoo.client import KazooClient
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 数据 Watch
@zk.DataWatch('/config/app')
def watch_data(data, stat):
if data is not None:
print(f"数据: {data.decode()}")
else:
print("节点已删除")
# 子节点 Watch
@zk.ChildrenWatch('/services')
def watch_children(children):
print(f"子节点: {children}")
# 保持运行
import time
while True:
time.sleep(1)
最佳实践¶
1. 重新注册 Watch¶
// Watch 是一次性的,需要重新注册
public void watchNode(String path) {
zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 处理事件
handleEvent(event);
// 重新注册 Watch
watchNode(path);
}
}, null);
}
2. 处理连接丢失¶
// 处理会话过期
zk.register(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
// 重新连接
reconnect();
// 重新注册所有 Watch
registerAllWatches();
}
}
});
3. 批量监听¶
// 使用 TreeCache 批量监听
TreeCache treeCache = new TreeCache(client, "/config");
treeCache.start();
// 统一处理所有事件
treeCache.getListenable().addListener((client, event) -> {
handleConfigChange(event);
});
小结¶
Watch 机制要点:
- Watch 特性:一次性触发、异步通知、顺序保证
- Watch 类型:数据 Watch、子节点 Watch、存在性 Watch
- 事件类型:NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChanged
- Curator Cache:NodeCache、PathChildrenCache、TreeCache
- 最佳实践:重新注册、处理连接丢失、批量监听
下一章我们将学习集群部署。