跳转至

第四章:客户端操作

命令行客户端

连接服务

# 连接本地 ZooKeeper
zkCli.sh

# 连接远程 ZooKeeper
zkCli.sh -server 192.168.1.101:2181

# 连接集群
zkCli.sh -server node1:2181,node2:2181,node3:2181

基本操作

# 查看根节点
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]

# 创建节点
[zk: localhost:2181(CONNECTED) 1] create /config "config-data"
Created /config

# 创建子节点
[zk: localhost:2181(CONNECTED) 2] create /config/app "app-config"
Created /config/app

# 获取节点数据
[zk: localhost:2181(CONNECTED) 3] get /config/app
app-config

# 获取节点详情
[zk: localhost:2181(CONNECTED) 4] get -s /config/app
app-config
cZxid = 0x100000003
ctime = Fri Mar 27 10:00:00 CST 2024
mZxid = 0x100000003
mtime = Fri Mar 27 10:00:00 CST 2024
pZxid = 0x100000003
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0

# 更新节点数据
[zk: localhost:2181(CONNECTED) 5] set /config/app "new-config"

# 删除节点
[zk: localhost:2181(CONNECTED) 6] delete /config/app

# 递归删除
[zk: localhost:2181(CONNECTED) 7] deleteall /config

临时节点

# 创建临时节点
[zk: localhost:2181(CONNECTED) 8] create -e /temp "temp-data"
Created /temp

# 创建顺序节点
[zk: localhost:2181(CONNECTED) 9] create -s /seq "seq-data"
Created /seq0000000001

# 创建临时顺序节点
[zk: localhost:2181(CONNECTED) 10] create -e -s /eseq "eseq-data"
Created /eseq0000000002

查看节点信息

# 查看节点状态
[zk: localhost:2181(CONNECTED) 11] stat /config
cZxid = 0x100000003
ctime = Fri Mar 27 10:00:00 CST 2024
mZxid = 0x100000003
mtime = Fri Mar 27 10:00:00 CST 2024
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1

# 查看子节点
[zk: localhost:2181(CONNECTED) 12] ls /config
[app]

# 查看子节点详情
[zk: localhost:2181(CONNECTED) 13] ls -s /config
[app]cZxid = 0x100000003
...

Java 客户端

添加依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

连接 ZooKeeper

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;

// 创建连接
ZooKeeper zk = new ZooKeeper(
    "localhost:2181",  // 服务器地址
    30000,             // 会话超时时间
    new Watcher() {    // 默认 Watcher
        @Override
        public void process(WatchedEvent event) {
            System.out.println("事件: " + event);
        }
    }
);

// 等待连接
while (zk.getState() != ZooKeeper.States.CONNECTED) {
    Thread.sleep(100);
}

System.out.println("连接成功");

CRUD 操作

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

// 创建节点
String path = zk.create(
    "/config/app",                    // 路径
    "config-data".getBytes(),         // 数据
    ZooDefs.Ids.OPEN_ACL_UNSAFE,      // ACL
    CreateMode.PERSISTENT             // 节点类型
);

// 读取数据
Stat stat = new Stat();
byte[] data = zk.getData("/config/app", false, stat);
System.out.println("数据: " + new String(data));

// 更新数据
zk.setData("/config/app", "new-data".getBytes(), stat.getVersion());

// 删除节点
zk.delete("/config/app", -1);

// 获取子节点
List<String> children = zk.getChildren("/", false);
for (String child : children) {
    System.out.println("子节点: " + child);
}

// 检查节点是否存在
Stat exists = zk.exists("/config/app", false);
if (exists != null) {
    System.out.println("节点存在");
}

Curator 客户端

添加依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

连接 ZooKeeper

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("localhost:2181")
    .sessionTimeoutMs(30000)
    .connectionTimeoutMs(10000)
    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    .build();

// 启动客户端
client.start();

// 关闭客户端
client.close();

CRUD 操作

// 创建节点
client.create()
    .creatingParentsIfNeeded()
    .withMode(CreateMode.PERSISTENT)
    .forPath("/config/app", "config-data".getBytes());

// 读取数据
byte[] data = client.getData().forPath("/config/app");
System.out.println("数据: " + new String(data));

// 读取数据(带状态)
Stat stat = new Stat();
data = client.getData().storingStatIn(stat).forPath("/config/app");

// 更新数据
client.setData()
    .withVersion(stat.getVersion())
    .forPath("/config/app", "new-data".getBytes());

// 删除节点
client.delete()
    .guaranteed()           // 保证删除成功
    .deletingChildrenIfNeeded()  // 递归删除子节点
    .withVersion(-1)
    .forPath("/config/app");

// 检查节点是否存在
Stat exists = client.checkExists().forPath("/config/app");
if (exists != null) {
    System.out.println("节点存在");
}

// 获取子节点
List<String> children = client.getChildren().forPath("/");

异步操作

// 异步创建
client.create()
    .creatingParentsIfNeeded()
    .inBackground(new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) {
            System.out.println("创建结果: " + event.getResultCode());
            System.out.println("路径: " + event.getPath());
        }
    })
    .forPath("/config/app", "data".getBytes());

// 异步读取
client.getData()
    .inBackground((client, event) -> {
        System.out.println("数据: " + new String(event.getData()));
    })
    .forPath("/config/app");

Python 客户端

安装依赖

pip install kazoo

连接 ZooKeeper

from kazoo.client import KazooClient

# 创建客户端
zk = KazooClient(hosts='localhost:2181')
zk.start()

# 关闭连接
zk.stop()

CRUD 操作

# 创建节点
zk.create('/config/app', b'config-data')

# 创建持久节点
zk.create('/config/database', b'db-config', ephemeral=False)

# 创建临时节点
zk.create('/services/user', b'192.168.1.1:8080', ephemeral=True)

# 读取数据
data, stat = zk.get('/config/app')
print(f"数据: {data.decode()}")

# 更新数据
zk.set('/config/app', b'new-data')

# 删除节点
zk.delete('/config/app')

# 递归删除
zk.delete('/config', recursive=True)

# 检查节点是否存在
if zk.exists('/config/app'):
    print("节点存在")

# 获取子节点
children = zk.get_children('/')
for child in children:
    print(f"子节点: {child}")

小结

客户端操作要点:

  • 命令行:zkCli.sh 基本操作
  • Java 客户端:原生 API、Curator 框架
  • Python 客户端:Kazoo 库
  • 异步操作:回调函数

下一章我们将学习 Watch 机制。