在许多业务研发团队中,都会有一些底层的基础数据服务,如用户服务、机构服务、商品服务和地址服务等。例如,在我们的团队中,就有一个名为“基础数据服务”的应用,它提供了商品、用户、机构等多种维度数据的 Dubbo 服务。相较于其他业务服务,这个服务承受的压力是最大的。
以查询订单信息为例,我们需要调用基础数据服务三次来查询用户、地址和商品信息:
public OrderInfo getOrderInfo(){
//获取用户信息,假设耗时 100ms
User user = basicDataUserService.getUser();
//获取地址信息,假设耗时 200ms
Address address = basicDataUserService.getAddress(user.getAddressId());
//获取商品信息,假设耗时 300ms
Goods goods = basicDataGoodsService.getGoodsInfo();
...
}
如果需要查询一万次订单信息,那么基础数据服务的访问量就会被放大三倍,达到三万次。这种情况下,基础数据服务的压力会非常大。因此,我们需要寻找一种方法来优化这个过程,以减少对基础数据服务的压力。
以上场景可以考虑以下几种优化策略:
并行执行:可以将串行的操作改为并行执行。例如使用 Future
,这样整体的耗时可能会降低到 300ms 左右。
接口聚合:可以考虑将部分接口聚合。例如,地址信息其实是用户信息的一部分,可以提供一个同时返回用户和地址信息的接口。然而,这种策略在实际的业务场景中可能很难实现。因为在某些场景中,我们可能只需要获取用户的基本信息,而不需要地址信息。这就需要提供两个接口:一个只返回用户基本信息,另一个返回用户基本信息和地址信息。但是,用户信息可能还包括其他属性,如权限信息和会员信息。如果每增加一种属性,就需要新增一个接口,那么接口的设计就会变得混乱,这在某种程度上违反了“单一职责原则”。
这就引出了本文的主题:能否像 Redis 那样,支持类似于 Pipeline 的机制?在这种机制中,Consumer 可以一次性将所有的请求参数和函数发送给 Provider,然后 Provider 在全部执行完成后一次性返回结果给 Consumer。
先看一下 Redis 的 Pipeline 的使用例子:
/**
* @author dongguabai
* @date 2024-01-17 19:59
*/
public class JedisExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
Pipeline pipeline = jedis.pipelined();
pipeline.set("name", "John");
pipeline.set("age", "25");
pipeline.incr("visits");
//执行 pipeline 中的所有命令
pipeline.sync();
}
}
Redis 的 Pipeline 机制允许客户端一次性发送多个命令到服务器,然后服务器一次性返回所有命令的结果。这种机制可以减少客户端与服务器之间的网络往返次数,从而提高性能。
在常规的请求/响应模型中,客户端发送一个命令,然后等待服务器的响应,然后再发送下一个命令。这种模型在网络延迟较高的情况下可能会导致性能问题,因为客户端需要等待每个命令的响应。而使用 Pipeline 机制,客户端可以一次性发送多个命令,然后等待所有命令的响应。这样可以减少网络延迟的影响,提高性能。
Dubbo 本身并未直接提供类似于 Redis 的 Pipeline 机制的支持。Dubbo 的主要设计是基于一次请求一次响应的模型。但可以通过一些技术手段来实现类似的功能。
请求操作:
/**
* @author dongguabai
* @date 2024-01-17 20:02
*/
public class Operation implements Serializable {
private Class<?> serviceClass;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] args;
public Operation setServiceClass(Class<?> serviceClass) {
if (!serviceClass.isInterface()) {
throw new IllegalArgumentException("serviceClass must be an interface");
}
this.serviceClass = serviceClass;
return this;
}
public Operation setMethodName(String methodName) {
this.methodName = methodName;
return this;
}
public Operation setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
return this;
}
public Operation setArgs(Object[] args) {
this.args = args;
return this;
}
public Class<?> getServiceClass() {
return serviceClass;
}
public String getMethodName() {
return methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public Object[] getArgs() {
return args;
}
}
操作聚合为一个 Pipeline:
/**
* @author dongguabai
* @date 2024-01-17 20:05
*/
public class Pipeline {
private List<Operation> operations = new ArrayList<>();
public Pipeline addOperation(Operation operation) {
operations.add(operation);
return this;
}
public List<Operation> getOperations() {
return operations;
}
}
Pipeline 接口:
/**
* @author dongguabai
* @date 2024-01-17 20:01
*/
public interface PipelineService {
List<OperationResult> executeBatch(Pipeline pipeline);
}
实现类,并行执行多个 Operation:
@Service
public class PipelineServiceImpl implements PipelineService {
@Autowired
private ApplicationContext applicationContext;
@Override
public List<OperationResult> executeBatch(Pipeline pipeline) {
List<CompletableFuture<OperationResult>> futures = pipeline.getOperations().stream()
.map(operation -> CompletableFuture.supplyAsync(() -> {
Object service = applicationContext.getBean(operation.getServiceClass());
try {
Method method = getMethod(service.getClass(), operation.getMethodName(), operation.getParameterTypes());
Object result = method.invoke(service, operation.getArgs());
return new OperationResult(result, null);
} catch (Throwable e) {
e.printStackTrace();
return new OperationResult(null, e);
}
}))
.collect(Collectors.toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
private Method getMethod(Class<?> serviceClass, String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
return serviceClass.getMethod(methodName, parameterTypes);
}
}
执行结果包装:
/**
* @author dongguabai
* @date 2024-01-17 20:20
*/
public class OperationResult implements Serializable {
private Object result;
private Throwable exception;
public OperationResult(Object result) {
this.result = result;
}
public OperationResult(Exception exception) {
this.exception = exception;
}
public boolean hasException() {
return exception != null;
}
public OperationResult(Object result, Throwable exception) {
this.result = result;
this.exception = exception;
}
}
使用示例:
/**
* @author dongguabai
* @date 2024-01-17 20:09
*/
public class PipelineDemo {
@Reference
private PipelineService pipelineService;
public void execute() {
Operation operation1 = new Operation()
.setServiceClass(UserService.class)
.setMethodName("findById")
.setParameterTypes(new Class<?>[]{int.class})
.setArgs(new Object[]{1});
Operation operation2 = new Operation()
.setServiceClass(GoodsService.class)
.setMethodName("findById")
.setParameterTypes(new Class<?>[]{int.class})
.setArgs(new Object[]{2});
Pipeline pipeline = new Pipeline()
.addOperation(operation1)
.addOperation(operation2);
List<OperationResult> operationResults = pipelineService.executeBatch(pipeline);
}
}
从 Consumer 端和 Provider 端的角度,对这套基于 Dubbo 的 Pipeline 机制做一个分析:
Consumer
优点
缺点
Provider
总的来说,这套基于 Dubbo 的 Pipeline 机制可以减少网络请求次数,但也可能增加响应时间。更适用于那些需要连续多次调用同一个服务的场景。在这种场景下,通过将多个请求合并为一个请求,可以有效地减少网络请求的次数,从而减少网络通信的开销。
欢迎关注公众号: