第七章:应用场景¶
配置中心¶
架构设计¶
┌─────────────────────────────────────────────────────────────┐
│ 配置中心架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 应用服务 │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Service A │ │ Service B │ │ Service C │ │ │
│ │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ │
│ │ │ │ │ │ │
│ │ └─────────────┼─────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────────┐ │ │
│ │ │ Watch 配置变化│ │ │
│ │ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ZooKeeper │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ /config │ │ │
│ │ │ ├── database/mysql │ │ │
│ │ │ ├── database/redis │ │ │
│ │ │ ├── app/timeout │ │ │
│ │ │ └── app/feature-flags │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
实现¶
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
public class ConfigCenter {
private CuratorFramework client;
private TreeCache configCache;
public ConfigCenter(String zkAddress) {
client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
}
// 获取配置
public String getConfig(String key) throws Exception {
String path = "/config/" + key;
byte[] data = client.getData().forPath(path);
return new String(data);
}
// 设置配置
public void setConfig(String key, String value) throws Exception {
String path = "/config/" + key;
if (client.checkExists().forPath(path) != null) {
client.setData().forPath(path, value.getBytes());
} else {
client.create().creatingParentsIfNeeded().forPath(path, value.getBytes());
}
}
// 监听配置变化
public void watchConfig(String key, ConfigChangeListener listener) throws Exception {
String path = "/config/" + key;
configCache = new TreeCache(client, path);
configCache.start();
configCache.getListenable().addListener((client, event) -> {
if (event.getData() != null) {
String configKey = event.getData().getPath();
String configValue = new String(event.getData().getData());
listener.onConfigChange(configKey, configValue);
}
});
}
public interface ConfigChangeListener {
void onConfigChange(String key, String value);
}
}
服务注册与发现¶
架构设计¶
┌─────────────────────────────────────────────────────────────┐
│ 服务注册与发现架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 服务消费者 │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ 1. 查询服务列表 │ │ │
│ │ │ 2. 选择服务实例 │ │ │
│ │ │ 3. 调用服务 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ZooKeeper │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ /services │ │ │
│ │ │ ├── user-service │ │ │
│ │ │ │ ├── instance_0001 (192.168.1.1:8080) │ │ │
│ │ │ │ └── instance_0002 (192.168.1.2:8080) │ │ │
│ │ │ └── order-service │ │ │
│ │ │ └── instance_0001 (192.168.1.3:8080) │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 服务提供者 │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ 1. 启动时注册服务 │ │ │
│ │ │ 2. 关闭时注销服务 │ │ │
│ │ │ 3. 发送心跳 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
实现¶
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
// 服务实例
public class ServiceInstance {
private String host;
private int port;
private String name;
// getters and setters
}
// 服务注册
public class ServiceRegistry {
private CuratorFramework client;
private ServiceDiscovery<ServiceInstance> discovery;
public ServiceRegistry(String zkAddress) throws Exception {
client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
JsonInstanceSerializer<ServiceInstance> serializer =
new JsonInstanceSerializer<>(ServiceInstance.class);
discovery = ServiceDiscoveryBuilder.builder(ServiceInstance.class)
.client(client)
.basePath("/services")
.serializer(serializer)
.build();
discovery.start();
}
// 注册服务
public void register(String serviceName, String host, int port) throws Exception {
ServiceInstance<ServiceInstance> instance = ServiceInstance.<ServiceInstance>builder()
.name(serviceName)
.address(host)
.port(port)
.payload(new ServiceInstance(host, port, serviceName))
.build();
discovery.registerService(instance);
}
// 注销服务
public void unregister(String serviceName, String instanceId) throws Exception {
ServiceInstance<ServiceInstance> instance = discovery.queryForInstance(serviceName, instanceId);
discovery.unregisterService(instance);
}
// 发现服务
public List<ServiceInstance<ServiceInstance>> discover(String serviceName) throws Exception {
return discovery.queryForInstances(serviceName);
}
}
分布式锁¶
实现¶
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class DistributedLock {
private CuratorFramework client;
private InterProcessMutex lock;
public DistributedLock(String zkAddress, String lockPath) {
client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
lock = new InterProcessMutex(client, lockPath);
}
// 获取锁
public boolean acquire(long timeout, TimeUnit unit) {
try {
return lock.acquire(timeout, unit);
} catch (Exception e) {
return false;
}
}
// 释放锁
public void release() {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 使用示例
DistributedLock lock = new DistributedLock("localhost:2181", "/locks/order-lock");
try {
if (lock.acquire(10, TimeUnit.SECONDS)) {
// 执行业务逻辑
processOrder();
}
} finally {
lock.release();
}
集群选举¶
实现¶
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
public class LeaderElection {
private CuratorFramework client;
private LeaderSelector leaderSelector;
public LeaderElection(String zkAddress, String electionPath) {
client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
leaderSelector = new LeaderSelector(client, electionPath, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// 成为 Leader 后执行
System.out.println("成为 Leader");
try {
// 执行 Leader 任务
while (true) {
Thread.sleep(1000);
}
} finally {
// 释放 Leadership
System.out.println("释放 Leadership");
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// 处理连接状态变化
}
});
leaderSelector.autoRequeue();
leaderSelector.start();
}
public boolean isLeader() {
return leaderSelector.hasLeadership();
}
}
小结¶
应用场景要点:
- 配置中心:集中管理、动态更新
- 服务注册与发现:自动注册、健康检查
- 分布式锁:互斥访问、防止重复
- 集群选举:Leader 选举、故障转移
下一章我们将学习生产实践。