扩展实现Dubbo Pipeline机制优化接口调用的思路

发布时间:2024年01月24日

背景

在许多业务研发团队中,都会有一些底层的基础数据服务,如用户服务、机构服务、商品服务和地址服务等。例如,在我们的团队中,就有一个名为“基础数据服务”的应用,它提供了商品、用户、机构等多种维度数据的 Dubbo 服务。相较于其他业务服务,这个服务承受的压力是最大的。

以查询订单信息为例,我们需要调用基础数据服务三次来查询用户、地址和商品信息:

    public OrderInfo getOrderInfo(){
        //获取用户信息,假设耗时 100ms
       User user =  basicDataUserService.getUser();
       //获取地址信息,假设耗时 200ms
       Address address = basicDataUserService.getAddress(user.getAddressId());
       //获取商品信息,假设耗时 300ms
       Goods goods =  basicDataGoodsService.getGoodsInfo();
       ...
    }

如果需要查询一万次订单信息,那么基础数据服务的访问量就会被放大三倍,达到三万次。这种情况下,基础数据服务的压力会非常大。因此,我们需要寻找一种方法来优化这个过程,以减少对基础数据服务的压力。

问题分析

以上场景可以考虑以下几种优化策略:

  • 并行执行:可以将串行的操作改为并行执行。例如使用 Future,这样整体的耗时可能会降低到 300ms 左右。

  • 接口聚合:可以考虑将部分接口聚合。例如,地址信息其实是用户信息的一部分,可以提供一个同时返回用户和地址信息的接口。然而,这种策略在实际的业务场景中可能很难实现。因为在某些场景中,我们可能只需要获取用户的基本信息,而不需要地址信息。这就需要提供两个接口:一个只返回用户基本信息,另一个返回用户基本信息和地址信息。但是,用户信息可能还包括其他属性,如权限信息和会员信息。如果每增加一种属性,就需要新增一个接口,那么接口的设计就会变得混乱,这在某种程度上违反了“单一职责原则”。

这就引出了本文的主题:能否像 Redis 那样,支持类似于 Pipeline 的机制?在这种机制中,Consumer 可以一次性将所有的请求参数和函数发送给 Provider,然后 Provider 在全部执行完成后一次性返回结果给 Consumer。

Redis Pipeline

先看一下 Redis 的 Pipeline 的使用例子:

/**
 * @author dongguabai
 * @date 2024-01-17 19:59
 */
public class JedisExample {

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        Pipeline pipeline = jedis.pipelined();
        pipeline.set("name", "John");
        pipeline.set("age", "25");
        pipeline.incr("visits");
        //执行 pipeline 中的所有命令
        pipeline.sync();
    }
}

Redis 的 Pipeline 机制允许客户端一次性发送多个命令到服务器,然后服务器一次性返回所有命令的结果。这种机制可以减少客户端与服务器之间的网络往返次数,从而提高性能。

在常规的请求/响应模型中,客户端发送一个命令,然后等待服务器的响应,然后再发送下一个命令。这种模型在网络延迟较高的情况下可能会导致性能问题,因为客户端需要等待每个命令的响应。而使用 Pipeline 机制,客户端可以一次性发送多个命令,然后等待所有命令的响应。这样可以减少网络延迟的影响,提高性能。

实现 Dubbo Pipeline

Dubbo 本身并未直接提供类似于 Redis 的 Pipeline 机制的支持。Dubbo 的主要设计是基于一次请求一次响应的模型。但可以通过一些技术手段来实现类似的功能。

请求操作:

/**
 * @author dongguabai
 * @date 2024-01-17 20:02
 */
public class Operation implements Serializable {

    private Class<?> serviceClass;

    private String methodName;

    private Class<?>[] parameterTypes;

    private Object[] args;

    public Operation setServiceClass(Class<?> serviceClass) {
        if (!serviceClass.isInterface()) {
            throw new IllegalArgumentException("serviceClass must be an interface");
        }
        this.serviceClass = serviceClass;
        return this;
    }

    public Operation setMethodName(String methodName) {
        this.methodName = methodName;
        return this;
    }

    public Operation setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
        return this;
    }

    public Operation setArgs(Object[] args) {
        this.args = args;
        return this;
    }

    public Class<?> getServiceClass() {
        return serviceClass;
    }

    public String getMethodName() {
        return methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public Object[] getArgs() {
        return args;
    }
}

操作聚合为一个 Pipeline:

/**
 * @author dongguabai
 * @date 2024-01-17 20:05
 */
public class Pipeline {
    private List<Operation> operations = new ArrayList<>();

    public Pipeline addOperation(Operation operation) {
        operations.add(operation);
        return this;
    }

    public List<Operation> getOperations() {
        return operations;
    }
}

Pipeline 接口:

/**
 * @author dongguabai
 * @date 2024-01-17 20:01
 */
public interface PipelineService {

    List<OperationResult> executeBatch(Pipeline pipeline);
}

实现类,并行执行多个 Operation:

@Service
public class PipelineServiceImpl implements PipelineService {

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public List<OperationResult> executeBatch(Pipeline pipeline) {
        List<CompletableFuture<OperationResult>> futures = pipeline.getOperations().stream()
                .map(operation -> CompletableFuture.supplyAsync(() -> {
                    Object service = applicationContext.getBean(operation.getServiceClass());
                    try {
                        Method method = getMethod(service.getClass(), operation.getMethodName(), operation.getParameterTypes());
                        Object result = method.invoke(service, operation.getArgs());
                        return new OperationResult(result, null);
                    } catch (Throwable e) {
                        e.printStackTrace();
                        return new OperationResult(null, e);
                    }
                }))
                .collect(Collectors.toList());
        return futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    private Method getMethod(Class<?> serviceClass, String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
        return serviceClass.getMethod(methodName, parameterTypes);
    }
}

执行结果包装:

/**
 * @author dongguabai
 * @date 2024-01-17 20:20
 */
public class OperationResult implements Serializable {

    private Object result;

    private Throwable exception;

    public OperationResult(Object result) {
        this.result = result;
    }

    public OperationResult(Exception exception) {
        this.exception = exception;
    }

    public boolean hasException() {
        return exception != null;
    }

    public OperationResult(Object result, Throwable exception) {
        this.result = result;
        this.exception = exception;
    }
}

使用示例:

/**
 * @author dongguabai
 * @date 2024-01-17 20:09
 */
public class PipelineDemo {

    @Reference
    private PipelineService pipelineService;

    public void execute() {
        Operation operation1 = new Operation()
                .setServiceClass(UserService.class)
                .setMethodName("findById")
                .setParameterTypes(new Class<?>[]{int.class})
                .setArgs(new Object[]{1});

        Operation operation2 = new Operation()
                .setServiceClass(GoodsService.class)
                .setMethodName("findById")
                .setParameterTypes(new Class<?>[]{int.class})
                .setArgs(new Object[]{2});

        Pipeline pipeline = new Pipeline()
                .addOperation(operation1)
                .addOperation(operation2);
        List<OperationResult> operationResults = pipelineService.executeBatch(pipeline);
    }
}

分析

从 Consumer 端和 Provider 端的角度,对这套基于 Dubbo 的 Pipeline 机制做一个分析:

  • Consumer

    • 优点

      • 减少网络请求次数:通过一次性发送多个请求,可以减少网络请求的次数,从而减少网络通信的开销。
      • 简化编程模型:Consumer 端只需要发送一次请求,然后等待结果,而不需要处理多个请求的复杂性(但是从业务代码的实现上来看,其实是降低了可读性)。
    • 缺点

      • 超时时间设置:如果 Provider 端并行处理请求,那么最慢的请求可能会决定整个请求的响应时间,但由于具体执行的函数是动态传入,这可能会导致 Consumer 端的响应时间增加。
  • Provider

    • 优点:
      • 减少网络请求次数:由于所有的请求都在一个请求中接收,Provider 端可以一次性处理所有的请求,这可以减少网络请求的次数,从而减少网络通信的开销。

总的来说,这套基于 Dubbo 的 Pipeline 机制可以减少网络请求次数,但也可能增加响应时间。更适用于那些需要连续多次调用同一个服务的场景。在这种场景下,通过将多个请求合并为一个请求,可以有效地减少网络请求的次数,从而减少网络通信的开销。

欢迎关注公众号:
在这里插入图片描述

文章来源:https://blog.csdn.net/Dongguabai/article/details/135818571
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。