跳转至

第七章:应用场景

配置中心

架构设计

┌─────────────────────────────────────────────────────────────┐
│                    配置中心架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   应用服务                           │   │
│  │  ┌───────────┐ ┌───────────┐ ┌───────────┐         │   │
│  │  │ Service A │ │ Service B │ │ Service C │         │   │
│  │  └─────┬─────┘ └─────┬─────┘ └─────┬─────┘         │   │
│  │        │             │             │                │   │
│  │        └─────────────┼─────────────┘                │   │
│  │                      │                              │   │
│  │                      ▼                              │   │
│  │              ┌───────────────┐                      │   │
│  │              │ Watch 配置变化│                      │   │
│  │              └───────────────┘                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   ZooKeeper                          │   │
│  │  ┌───────────────────────────────────────────────┐ │   │
│  │  │  /config                                       │ │   │
│  │  │  ├── database/mysql                            │ │   │
│  │  │  ├── database/redis                            │ │   │
│  │  │  ├── app/timeout                               │ │   │
│  │  │  └── app/feature-flags                         │ │   │
│  │  └───────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;

public class ConfigCenter {
    private CuratorFramework client;
    private TreeCache configCache;

    public ConfigCenter(String zkAddress) {
        client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();
    }

    // 获取配置
    public String getConfig(String key) throws Exception {
        String path = "/config/" + key;
        byte[] data = client.getData().forPath(path);
        return new String(data);
    }

    // 设置配置
    public void setConfig(String key, String value) throws Exception {
        String path = "/config/" + key;
        if (client.checkExists().forPath(path) != null) {
            client.setData().forPath(path, value.getBytes());
        } else {
            client.create().creatingParentsIfNeeded().forPath(path, value.getBytes());
        }
    }

    // 监听配置变化
    public void watchConfig(String key, ConfigChangeListener listener) throws Exception {
        String path = "/config/" + key;
        configCache = new TreeCache(client, path);
        configCache.start();

        configCache.getListenable().addListener((client, event) -> {
            if (event.getData() != null) {
                String configKey = event.getData().getPath();
                String configValue = new String(event.getData().getData());
                listener.onConfigChange(configKey, configValue);
            }
        });
    }

    public interface ConfigChangeListener {
        void onConfigChange(String key, String value);
    }
}

服务注册与发现

架构设计

┌─────────────────────────────────────────────────────────────┐
│                  服务注册与发现架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   服务消费者                         │   │
│  │  ┌───────────────────────────────────────────────┐ │   │
│  │  │  1. 查询服务列表                              │ │   │
│  │  │  2. 选择服务实例                              │ │   │
│  │  │  3. 调用服务                                  │ │   │
│  │  └───────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   ZooKeeper                          │   │
│  │  ┌───────────────────────────────────────────────┐ │   │
│  │  │  /services                                     │ │   │
│  │  │  ├── user-service                             │ │   │
│  │  │  │   ├── instance_0001 (192.168.1.1:8080)    │ │   │
│  │  │  │   └── instance_0002 (192.168.1.2:8080)    │ │   │
│  │  │  └── order-service                            │ │   │
│  │  │      └── instance_0001 (192.168.1.3:8080)    │ │   │
│  │  └───────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────┘   │
│                           ▲                                 │
│                           │                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   服务提供者                         │   │
│  │  ┌───────────────────────────────────────────────┐ │   │
│  │  │  1. 启动时注册服务                            │ │   │
│  │  │  2. 关闭时注销服务                            │ │   │
│  │  │  3. 发送心跳                                  │ │   │
│  │  └───────────────────────────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

// 服务实例
public class ServiceInstance {
    private String host;
    private int port;
    private String name;

    // getters and setters
}

// 服务注册
public class ServiceRegistry {
    private CuratorFramework client;
    private ServiceDiscovery<ServiceInstance> discovery;

    public ServiceRegistry(String zkAddress) throws Exception {
        client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();

        JsonInstanceSerializer<ServiceInstance> serializer = 
            new JsonInstanceSerializer<>(ServiceInstance.class);

        discovery = ServiceDiscoveryBuilder.builder(ServiceInstance.class)
            .client(client)
            .basePath("/services")
            .serializer(serializer)
            .build();
        discovery.start();
    }

    // 注册服务
    public void register(String serviceName, String host, int port) throws Exception {
        ServiceInstance<ServiceInstance> instance = ServiceInstance.<ServiceInstance>builder()
            .name(serviceName)
            .address(host)
            .port(port)
            .payload(new ServiceInstance(host, port, serviceName))
            .build();

        discovery.registerService(instance);
    }

    // 注销服务
    public void unregister(String serviceName, String instanceId) throws Exception {
        ServiceInstance<ServiceInstance> instance = discovery.queryForInstance(serviceName, instanceId);
        discovery.unregisterService(instance);
    }

    // 发现服务
    public List<ServiceInstance<ServiceInstance>> discover(String serviceName) throws Exception {
        return discovery.queryForInstances(serviceName);
    }
}

分布式锁

实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

public class DistributedLock {
    private CuratorFramework client;
    private InterProcessMutex lock;

    public DistributedLock(String zkAddress, String lockPath) {
        client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();

        lock = new InterProcessMutex(client, lockPath);
    }

    // 获取锁
    public boolean acquire(long timeout, TimeUnit unit) {
        try {
            return lock.acquire(timeout, unit);
        } catch (Exception e) {
            return false;
        }
    }

    // 释放锁
    public void release() {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// 使用示例
DistributedLock lock = new DistributedLock("localhost:2181", "/locks/order-lock");

try {
    if (lock.acquire(10, TimeUnit.SECONDS)) {
        // 执行业务逻辑
        processOrder();
    }
} finally {
    lock.release();
}

集群选举

实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;

public class LeaderElection {
    private CuratorFramework client;
    private LeaderSelector leaderSelector;

    public LeaderElection(String zkAddress, String electionPath) {
        client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
        client.start();

        leaderSelector = new LeaderSelector(client, electionPath, new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                // 成为 Leader 后执行
                System.out.println("成为 Leader");

                try {
                    // 执行 Leader 任务
                    while (true) {
                        Thread.sleep(1000);
                    }
                } finally {
                    // 释放 Leadership
                    System.out.println("释放 Leadership");
                }
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                // 处理连接状态变化
            }
        });

        leaderSelector.autoRequeue();
        leaderSelector.start();
    }

    public boolean isLeader() {
        return leaderSelector.hasLeadership();
    }
}

小结

应用场景要点:

  • 配置中心:集中管理、动态更新
  • 服务注册与发现:自动注册、健康检查
  • 分布式锁:互斥访问、防止重复
  • 集群选举:Leader 选举、故障转移

下一章我们将学习生产实践。