第四章:客户端操作¶
命令行客户端¶
连接服务¶
# 连接本地 ZooKeeper
zkCli.sh
# 连接远程 ZooKeeper
zkCli.sh -server 192.168.1.101:2181
# 连接集群
zkCli.sh -server node1:2181,node2:2181,node3:2181
基本操作¶
# 查看根节点
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
# 创建节点
[zk: localhost:2181(CONNECTED) 1] create /config "config-data"
Created /config
# 创建子节点
[zk: localhost:2181(CONNECTED) 2] create /config/app "app-config"
Created /config/app
# 获取节点数据
[zk: localhost:2181(CONNECTED) 3] get /config/app
app-config
# 获取节点详情
[zk: localhost:2181(CONNECTED) 4] get -s /config/app
app-config
cZxid = 0x100000003
ctime = Fri Mar 27 10:00:00 CST 2024
mZxid = 0x100000003
mtime = Fri Mar 27 10:00:00 CST 2024
pZxid = 0x100000003
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0
# 更新节点数据
[zk: localhost:2181(CONNECTED) 5] set /config/app "new-config"
# 删除节点
[zk: localhost:2181(CONNECTED) 6] delete /config/app
# 递归删除
[zk: localhost:2181(CONNECTED) 7] deleteall /config
临时节点¶
# 创建临时节点
[zk: localhost:2181(CONNECTED) 8] create -e /temp "temp-data"
Created /temp
# 创建顺序节点
[zk: localhost:2181(CONNECTED) 9] create -s /seq "seq-data"
Created /seq0000000001
# 创建临时顺序节点
[zk: localhost:2181(CONNECTED) 10] create -e -s /eseq "eseq-data"
Created /eseq0000000002
查看节点信息¶
# 查看节点状态
[zk: localhost:2181(CONNECTED) 11] stat /config
cZxid = 0x100000003
ctime = Fri Mar 27 10:00:00 CST 2024
mZxid = 0x100000003
mtime = Fri Mar 27 10:00:00 CST 2024
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 1
# 查看子节点
[zk: localhost:2181(CONNECTED) 12] ls /config
[app]
# 查看子节点详情
[zk: localhost:2181(CONNECTED) 13] ls -s /config
[app]cZxid = 0x100000003
...
Java 客户端¶
添加依赖¶
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
连接 ZooKeeper¶
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
// 创建连接
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 服务器地址
30000, // 会话超时时间
new Watcher() { // 默认 Watcher
@Override
public void process(WatchedEvent event) {
System.out.println("事件: " + event);
}
}
);
// 等待连接
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(100);
}
System.out.println("连接成功");
CRUD 操作¶
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
// 创建节点
String path = zk.create(
"/config/app", // 路径
"config-data".getBytes(), // 数据
ZooDefs.Ids.OPEN_ACL_UNSAFE, // ACL
CreateMode.PERSISTENT // 节点类型
);
// 读取数据
Stat stat = new Stat();
byte[] data = zk.getData("/config/app", false, stat);
System.out.println("数据: " + new String(data));
// 更新数据
zk.setData("/config/app", "new-data".getBytes(), stat.getVersion());
// 删除节点
zk.delete("/config/app", -1);
// 获取子节点
List<String> children = zk.getChildren("/", false);
for (String child : children) {
System.out.println("子节点: " + child);
}
// 检查节点是否存在
Stat exists = zk.exists("/config/app", false);
if (exists != null) {
System.out.println("节点存在");
}
Curator 客户端¶
添加依赖¶
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
连接 ZooKeeper¶
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
// 启动客户端
client.start();
// 关闭客户端
client.close();
CRUD 操作¶
// 创建节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/config/app", "config-data".getBytes());
// 读取数据
byte[] data = client.getData().forPath("/config/app");
System.out.println("数据: " + new String(data));
// 读取数据(带状态)
Stat stat = new Stat();
data = client.getData().storingStatIn(stat).forPath("/config/app");
// 更新数据
client.setData()
.withVersion(stat.getVersion())
.forPath("/config/app", "new-data".getBytes());
// 删除节点
client.delete()
.guaranteed() // 保证删除成功
.deletingChildrenIfNeeded() // 递归删除子节点
.withVersion(-1)
.forPath("/config/app");
// 检查节点是否存在
Stat exists = client.checkExists().forPath("/config/app");
if (exists != null) {
System.out.println("节点存在");
}
// 获取子节点
List<String> children = client.getChildren().forPath("/");
异步操作¶
// 异步创建
client.create()
.creatingParentsIfNeeded()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) {
System.out.println("创建结果: " + event.getResultCode());
System.out.println("路径: " + event.getPath());
}
})
.forPath("/config/app", "data".getBytes());
// 异步读取
client.getData()
.inBackground((client, event) -> {
System.out.println("数据: " + new String(event.getData()));
})
.forPath("/config/app");
Python 客户端¶
安装依赖¶
连接 ZooKeeper¶
from kazoo.client import KazooClient
# 创建客户端
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 关闭连接
zk.stop()
CRUD 操作¶
# 创建节点
zk.create('/config/app', b'config-data')
# 创建持久节点
zk.create('/config/database', b'db-config', ephemeral=False)
# 创建临时节点
zk.create('/services/user', b'192.168.1.1:8080', ephemeral=True)
# 读取数据
data, stat = zk.get('/config/app')
print(f"数据: {data.decode()}")
# 更新数据
zk.set('/config/app', b'new-data')
# 删除节点
zk.delete('/config/app')
# 递归删除
zk.delete('/config', recursive=True)
# 检查节点是否存在
if zk.exists('/config/app'):
print("节点存在")
# 获取子节点
children = zk.get_children('/')
for child in children:
print(f"子节点: {child}")
小结¶
客户端操作要点:
- 命令行:zkCli.sh 基本操作
- Java 客户端:原生 API、Curator 框架
- Python 客户端:Kazoo 库
- 异步操作:回调函数
下一章我们将学习 Watch 机制。