dubbo:异步

发布时间:2024年01月23日

从 2.7.0 开始,Dubbo 的所有异步编程接口开始以 CompletableFuture 为基础

Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无异于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。

Provider 端异步执行和 Consumer 端异步调用是相互独立的,可以任意正交组合两端配置

Consumer同步 - Provider同步
Consumer异步 - Provider同步
Consumer同步 - Provider异步
Consumer异步 - Provider异步

provider异步:
定义一个CompletableFuture的接口

public interface AsyncService {
    CompletableFuture<String> sayHello(String name);
}
 
 
 
 
继承:
public class AsyncServiceImpl implements AsyncService {
    @Override
    public CompletableFuture<String> sayHello(String name) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(name);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "async response from provider.";
        });
    }
}```


使用AsyncContext
Dubbo 提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在原有同步方法上实现异步。

服务暴露的流程和其他服务一样。


```java
public class AsyncServiceImpl implements AsyncService {
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            // 如果要使用上下文,则必须要放在第一句执行
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 写回响应
            asyncContext.write("Hello " + name + ", response from provider.");
        }).start();
        return null;
    }
}

consumer异步:

使用RpcContext调用

配置:加入属性async
<dubbo:reference id=“asyncService” interface=“org.apache.dubbo.samples.governance.api.AsyncService”>
<dubbo:method name=“sayHello” async=“true” />
</dubbo:reference>

调用1:

// 此调用会立即返回null
asyncService.sayHello("world");
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
// 为Future添加回调
helloFuture.whenComplete((retValue, exception) -> {
    if (exception == null) {
        System.out.println(retValue);
    } else {
        exception.printStackTrace();
    }
});```


调用2:

```java
public void testsayHellocompletableFuture()throws ExecutionException,InterruptedException,IOException{
CompletableFutureAsyncService service = (CompletableFutureAsyncService)context.getBean("AsyncService");
CompletableFuture<string>completableFuture =  service.sayHelloAsync("AsyncService:hello dubbo");
LOGGER.info("testsayHel1o->"+completableFuture.get())}
 
 
 
 
 
 
调用三:
CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
    () -> {
        asyncService.sayHello("oneway call request1");
    }
);
 
future.get();

文章来源:https://blog.csdn.net/weixin_43675252/article/details/135756143
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。