跳转至

第五章:Watch 机制

Watch 概述

什么是 Watch?

Watch 是 ZooKeeper 提供的事件监听机制,当 ZNode 发生变化时,ZooKeeper 会通知客户端。

Client                          ZooKeeper Server
  │                                    │
  │──── getData("/config", watch=true)│
  │◄─── data + 注册 Watch ────────────│
  │                                    │
  │         (节点数据变化)              │
  │                                    │
  │◄─── Watch Event ──────────────────│
  │     (NodeDataChanged)             │
  │                                    │
  │──── getData("/config", watch=true)│
  │◄─── data + 注册 Watch ────────────│
  │                                    │

Watch 特性

1. 一次性触发
   - Watch 被触发后,需要重新注册

2. 异步通知
   - 通知是异步发送的

3. 顺序保证
   - 客户端先收到 Watch 通知,再看到数据变化

4. 轻量级
   - 只通知事件类型,不包含变化后的数据

Watch 类型

数据 Watch

// 监听节点数据变化
zk.getData("/config/app", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.NodeDataChanged) {
            System.out.println("节点数据已变化");
            // 重新注册 Watch
            try {
                byte[] data = zk.getData("/config/app", this, null);
                System.out.println("新数据: " + new String(data));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}, null);

子节点 Watch

// 监听子节点变化
zk.getChildren("/services", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.NodeChildrenChanged) {
            System.out.println("子节点已变化");
            // 重新注册 Watch
            try {
                List<String> children = zk.getChildren("/services", this);
                System.out.println("子节点: " + children);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
});

存在性 Watch

// 监听节点是否存在
zk.exists("/config/app", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case NodeCreated:
                System.out.println("节点已创建");
                break;
            case NodeDeleted:
                System.out.println("节点已删除");
                break;
            case NodeDataChanged:
                System.out.println("节点数据已变化");
                break;
        }
    }
});

事件类型

事件类型 说明 触发方法
NodeCreated 节点创建 exists()
NodeDeleted 节点删除 exists(), getData(), getChildren()
NodeDataChanged 数据变化 exists(), getData()
NodeChildrenChanged 子节点变化 getChildren()

Curator Watch

NodeCache

import org.apache.curator.framework.recipes.cache.NodeCache;

// 创建 NodeCache(监听节点数据变化)
NodeCache nodeCache = new NodeCache(client, "/config/app");
nodeCache.start();

// 添加监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        ChildData data = nodeCache.getCurrentData();
        if (data != null) {
            System.out.println("数据: " + new String(data.getData()));
        } else {
            System.out.println("节点已删除");
        }
    }
});

// 关闭
nodeCache.close();

PathChildrenCache

import org.apache.curator.framework.recipes.cache.PathChildrenCache;

// 创建 PathChildrenCache(监听子节点变化)
PathChildrenCache childrenCache = new PathChildrenCache(
    client, 
    "/services", 
    true
);
childrenCache.start();

// 添加监听器
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
        switch (event.getType()) {
            case CHILD_ADDED:
                System.out.println("子节点添加: " + event.getData().getPath());
                break;
            case CHILD_UPDATED:
                System.out.println("子节点更新: " + event.getData().getPath());
                break;
            case CHILD_REMOVED:
                System.out.println("子节点删除: " + event.getData().getPath());
                break;
        }
    }
});

// 关闭
childrenCache.close();

TreeCache

import org.apache.curator.framework.recipes.cache.TreeCache;

// 创建 TreeCache(监听节点及子节点)
TreeCache treeCache = new TreeCache(client, "/config");
treeCache.start();

// 添加监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework client, TreeCacheEvent event) {
        switch (event.getType()) {
            case NODE_ADDED:
                System.out.println("节点添加: " + event.getData().getPath());
                break;
            case NODE_UPDATED:
                System.out.println("节点更新: " + event.getData().getPath());
                break;
            case NODE_REMOVED:
                System.out.println("节点删除: " + event.getData().getPath());
                break;
        }
    }
});

// 关闭
treeCache.close();

Python Watch

from kazoo.client import KazooClient

zk = KazooClient(hosts='localhost:2181')
zk.start()

# 数据 Watch
@zk.DataWatch('/config/app')
def watch_data(data, stat):
    if data is not None:
        print(f"数据: {data.decode()}")
    else:
        print("节点已删除")

# 子节点 Watch
@zk.ChildrenWatch('/services')
def watch_children(children):
    print(f"子节点: {children}")

# 保持运行
import time
while True:
    time.sleep(1)

最佳实践

1. 重新注册 Watch

// Watch 是一次性的,需要重新注册
public void watchNode(String path) {
    zk.getData(path, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            // 处理事件
            handleEvent(event);

            // 重新注册 Watch
            watchNode(path);
        }
    }, null);
}

2. 处理连接丢失

// 处理会话过期
zk.register(new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == KeeperState.Expired) {
            // 重新连接
            reconnect();

            // 重新注册所有 Watch
            registerAllWatches();
        }
    }
});

3. 批量监听

// 使用 TreeCache 批量监听
TreeCache treeCache = new TreeCache(client, "/config");
treeCache.start();

// 统一处理所有事件
treeCache.getListenable().addListener((client, event) -> {
    handleConfigChange(event);
});

小结

Watch 机制要点:

  • Watch 特性:一次性触发、异步通知、顺序保证
  • Watch 类型:数据 Watch、子节点 Watch、存在性 Watch
  • 事件类型:NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChanged
  • Curator Cache:NodeCache、PathChildrenCache、TreeCache
  • 最佳实践:重新注册、处理连接丢失、批量监听

下一章我们将学习集群部署。