跳转至

第四章:服务消费者

服务引用

注解引用

import org.apache.dubbo.config.annotation.DubboReference;

@RestController
public class UserController {

    @DubboReference(
        version = "1.0.0",
        timeout = 5000,
        retries = 2,
        loadbalance = "roundrobin"
    )
    private UserService userService;

    @GetMapping("/user/{id}")
    public User getUser(@PathVariable Long id) {
        return userService.getUser(id);
    }
}

引用配置

@DubboReference(
    // 服务版本
    version = "1.0.0",

    // 服务分组
    group = "user-group",

    // 超时时间
    timeout = 5000,

    // 重试次数
    retries = 2,

    // 负载均衡
    loadbalance = "roundrobin",

    // 集群容错
    cluster = "failover",

    // 异步调用
    async = false,

    // 是否检查
    check = false,

    // 是否懒加载
    lazy = true,

    // 是否泛化调用
    generic = false,

    // 是否本地存根
    stub = "com.example.UserServiceStub"
)
private UserService userService;

方法级配置

@DubboReference(
    version = "1.0.0",
    methods = {
        @Method(name = "getUser", timeout = 3000, retries = 2),
        @Method(name = "createUser", timeout = 10000, retries = 0),
        @Method(name = "listUsers", timeout = 5000, loadbalance = "random")
    }
)
private UserService userService;

调用方式

同步调用

// 同步调用(默认)
User user = userService.getUser(1L);
System.out.println(user);

异步调用

import org.apache.dubbo.rpc.RpcContext;
import java.util.concurrent.CompletableFuture;

// 方式一:RpcContext
@DubboReference(async = true)
private UserService userService;

// 调用
userService.getUser(1L);
CompletableFuture<User> future = RpcContext.getContext().getCompletableFuture();
User user = future.get();

// 方式二:返回 CompletableFuture
@DubboReference
private UserService userService;

CompletableFuture<User> future = userService.getUserAsync(1L);
User user = future.get();

参数回调

// 服务接口
public interface UserService {
    void addListener(String key, CallbackListener listener);
}

// 消费者
userService.addListener("user-event", new CallbackListener() {
    @Override
    public void onEvent(String event) {
        System.out.println("收到回调: " + event);
    }
});

本地存根

// 本地存根
public class UserServiceStub implements UserService {

    private final UserService userService;

    public UserServiceStub(UserService userService) {
        this.userService = userService;
    }

    @Override
    public User getUser(Long id) {
        // 本地逻辑
        if (id == null) {
            return null;
        }

        try {
            return userService.getUser(id);
        } catch (Exception e) {
            // 降级处理
            return new User();
        }
    }
}

// 使用存根
@DubboReference(stub = "com.example.UserServiceStub")
private UserService userService;

本地伪装

// 本地伪装
public class UserServiceMock implements UserService {

    @Override
    public User getUser(Long id) {
        // 返回默认值
        User user = new User();
        user.setId(id);
        user.setName("mock-user");
        return user;
    }
}

// 使用伪装
@DubboReference(mock = "com.example.UserServiceMock")
private UserService userService;

// 或者使用简单值
@DubboReference(mock = "return null")
private UserService userService;

@DubboReference(mock = "return {id:1,name:'mock'}")
private UserService userService;

集群容错

Failover(失败重试)

// 失败重试(默认)
@DubboReference(
    cluster = "failover",
    retries = 2  // 重试次数
)
private UserService userService;

Failfast(快速失败)

// 快速失败,不重试
@DubboReference(cluster = "failfast")
private UserService userService;

Failsafe(失败安全)

// 失败安全,忽略异常
@DubboReference(cluster = "failsafe")
private UserService userService;

Failback(失败恢复)

// 失败自动恢复,后台重试
@DubboReference(cluster = "failback")
private UserService userService;

Forking(并行调用)

// 并行调用,一个成功即返回
@DubboReference(
    cluster = "forking",
    forks = 2  // 并行数
)
private UserService userService;

Broadcast(广播调用)

// 广播调用,所有提供者都执行
@DubboReference(cluster = "broadcast")
private UserService userService;

负载均衡

Random(随机)

// 随机负载均衡(默认)
@DubboReference(loadbalance = "random")
private UserService userService;

RoundRobin(轮询)

// 轮询负载均衡
@DubboReference(loadbalance = "roundrobin")
private UserService userService;

LeastActive(最少活跃)

// 最少活跃调用数
@DubboReference(loadbalance = "leastactive")
private UserService userService;

ConsistentHash(一致性哈希)

// 一致性哈希
@DubboReference(
    loadbalance = "consistenthash",
    arguments = 0  // 第一个参数作为哈希键
)
private UserService userService;

泛化调用

泛化接口

import org.apache.dubbo.rpc.service.GenericService;

@DubboReference(interfaceName = "com.example.UserService", generic = true)
private GenericService userService;

// 调用
Object result = userService.$invoke("getUser", new String[]{"java.lang.Long"}, new Object[]{1L});

泛化实现

import org.apache.dubbo.rpc.service.GenericService;

@DubboService(interfaceName = "com.example.UserService")
public class GenericUserServiceImpl implements GenericService {

    @Override
    public Object $invoke(String method, String[] parameterTypes, Object[] args) {
        if ("getUser".equals(method)) {
            Long id = (Long) args[0];
            // 处理逻辑
            return getUser(id);
        }
        return null;
    }
}

小结

服务消费者要点:

  • 服务引用:注解引用、引用配置
  • 调用方式:同步、异步、回调、存根、伪装
  • 集群容错:Failover、Failfast、Failsafe、Failback、Forking、Broadcast
  • 负载均衡:Random、RoundRobin、LeastActive、ConsistentHash
  • 泛化调用:泛化接口、泛化实现

下一章我们将学习注册中心。