在Dubbo服务导出源码分析中,已经分析过服务导出的原理,但是留了一块小细节没有详细介绍,那就是服务导出的时候是如何启动网络服务的(以Netty为例)以及如何和我们的处理器进行绑定。重新回到DubboProtocol.export(Invoker<T> invoker)
方法这里,这里会把我们生成的invoker对象传进来,包装为一个DubboExporter对象。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// 构造一个Exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//忽略非主线代码
// 开启NettyServer
openServer(url);
optimizeSerialization(url);
return exporter;
}
开启NettyServer的方法就是在openServer(url)
方法中,我们接着看这个方法的逻辑:
private void openServer(URL url) {
// 获得ip地址和port, ip:端口
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 缓存Server对象
ExchangeServer server = serverMap.get(key);
// DCL,Double Check Lock
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建Server,并进行缓存
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服务重新导出时,就会走这里
server.reset(url);
}
}
}
这块其实有三层逻辑:
1.如果动态配置的信息修改了,那么会重新进行服务导出,然后会调用server.reset(url)
方法,进行服务重置。
2.如果当前服务还没有开启Netty服务,通过DCL机制调用createServer(url)
方法创建一个Netty连接,并放在缓存中。
3.如果缓存中已经有当前服务的Server了,那么就不会再开启一个NettyServer。这里假设我们是第一次导出,调用createServer(url)
方法。
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,默认为netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 通过url绑定端口,和对应的请求处理器
ExchangeServer server;
try {
// requestHandler是请求处理器,类型为ExchangeHandler
// 表示从url的端口接收到请求后,requestHandler来进行处理
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 协议的客户端实现类型,比如:dubbo协议的mina,netty等
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
1.注意这里的URL参数添加了一个参数CODEC_KEY,默认就是Dubbo协议,在构建Netty连接的时候设置编码器使用。
2.调用Exchangers.bind(url, requestHandler)
方法获取ExchangeServer具体实例,需要注意的是这里传入了一个requestHandler
在消费者进行服务调用的时候会调用其子类的reply
方法。
继续看Exchangers.bind(url, requestHandler)
方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// codec表示协议编码方式
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
return getExchanger(url).bind(url, handler);
}
getExchanger(url)
这里会使用SPI机制拿到Exchanger接口的默认实现类HeaderExchanger
的一个实例,所以我们需要看HeaderExchanger.bind
方法
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
虽然只有一行代码,其实做了很多事情:
1.对传进来的handler进行包装,第一层为HeaderExchangeHandler,第二层为DecodeHandler
2.然后调用Transporters.bind
方法,所以接着看Transporters.bind
方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
//省略非主线代码
// 如果bind了多个handler,那么当有一个连接过来时,会循环每个handler去处理连接
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 调用NettyTransporter去绑定,Transporter表示网络传输层
return getTransporter().bind(url, handler);
}
getTransporter().bind(url, handler)
这里仍然是SPI机制,拿到Transporter接口的具体实现,这里是Netty,所以会调用NettyTransporter.bind
方法
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
继续往下追踪,来到NettyServer
构造方法
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
重点看下ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))
方法,它会调用ChannelHandlers.wrapInternal
方法
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
1.首先会拿到Dispatcher接口的一个实现类,默认是AllDispatcher,这里和Dubbo的线程模型有关系,后边会详细讲
2.调用AllDispatcher.dispatch
方法
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
这一步就是就是对handler进一步包装,生成一个AllChannelHandler
。
3.返回的AllChannelHandler
又会被HeartbeatHandler
和MultiMessageHandler
进行包装,最终我们的handler
的样子:
接着回到主线
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
这里调用了NettyServer的父类==AbstractServer
==构造方法:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
try {
doOpen();
} catch (Throwable t) {
}
}
重点看doOpen
方法,其实是由子类实现的,这里是NettyServer.open
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
熟悉Netty的都知道,这里就是开启一个Netty服务了,我们重点看final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this)
,在这里生成NettyServerHandler
,里边包含了我们之前包装了好几层的handler,如下图
至此我们服务端的Netty服务已建立完毕,且形成一系列的handler,最终的handler链路如下图,当我们服务调用的时候会经过一系列handler的处理
当我们请求来的时候就会调用NettyServerHandler.channlRead
方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到数据
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
然后调用我们的handler.received(channel, msg)
方法进行方法的处理,这里调用的是责任链模式。按照层级关系调用各个handler的方法进行处理
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
这里就是判断数据是否是MultiMessage类型,如果使用原生方式的可以在消费端一次发生多条数据,MultiMessageHandler的职责就是处理多条数据的handler,假设我们只有一条数据,接着调用下一个handler
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
//todo
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
1.记录调用时间
2.如果是心跳请求并且需要双端响应,那么会返回一个心跳响应给客户端
3.判断是否是心跳响应,如果是什么都不做
4.继续调用下一个handler方法
AllChannelHandler这个就有说法了,之前我们已经说过这里跟Dubbo的线程模型有关系,可以参考官方文档:Dubbo线程模型
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
// 交给线程池去处理message
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
}
在AllChannelHandler中,会把我们的请求封装成ChannelEventRunnable交给线程池去执行,从而释放我们Netty的IO线程,继续处理其他的请求。
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
这里跟Dubbo的序列化和反序列化没关系,而是判断请求数据是否实现了Decodeable接口,如果实现了则调用其decode方法,相当于一个扩展点,然后接着调用下一个handler方法。
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 如果是双向通行,则需要返回调用结果
handleRequest(exchangeChannel, request);
} else {
// 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
这里做了一个判断
1.请求端需要响应结果的处理: handleRequest(exchangeChannel, request)
2.请求端不需要响应结果的处理:handler.received(exchangeChannel, request.getData())
我们这里以双端通信为例,看看 handleRequest(exchangeChannel, request)
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 请求id,请求版本
Response res = new Response(req.getId(), req.getVersion());
//忽略非主线代码
//获取 data 字段值,也就是 RpcInvocation 对象,表示请求内容
Object msg = req.getData();
try {
// 继续向下调用,分异步调用和同步调用,如果是同步则会阻塞,如果是异步则不会阻塞
CompletionStage<Object> future = handler.reply(channel, msg); // 异步执行服务
// 如果是同步调用则直接拿到结果,并发送到channel中去
// 如果是异步调用则会监听,直到拿到服务执行结果,然后发送到channel中去
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
// 服务执行过程中出现了异常,则把Throwable转成字符串,发送给channel中,也就是发送给客户端
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
这里逻辑有点绕,应该很多人不理解为什么调用下一个handler的reply方法会返回一个CompletionStage对象,因为这里是要对同步请求和异步请求进行处理,我们继续往下看。
1.调用ExchangeHandlerAdapter.reply
方法
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
//非主线代码
// 转成Invocation对象,要开始用反射执行方法了
Invocation inv = (Invocation) message;
//拿到当前调用服务的invoker对象
Invoker<?> invoker = getInvoker(channel, inv);
// 这里设置了,service中才能拿到remoteAddress
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行服务,得到结果
Result result = invoker.invoke(inv);
// 返回一个CompletableFuture
return result.completionFuture().thenApply(Function.identity());
2.调用服务提供者具体方法,封装返回对象
invoker对象是我们服务导出的时候,基于SPI机制生成的,默认是由JavassistProxyFactory
生成,我们看下JavassistProxyFactory.getInvoker
方法
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 如果现在被代理的对象proxy本身就是一个已经被代理过的对象,那么则取代理类的Wrapper,否则取type(接口)的Wrapper
// Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// proxy是服务实现类
// type是服务接口
// url是一个注册中心url,但同时也记录了
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 执行proxy的method方法
// 执行的proxy实例的方法
// 如果没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这里会new出来AbstractProxyInvoker的子类,所以执行ExchangeHandlerAdapter.reply
的时候
会来到AbstractProxyInvoker.invoke
方法
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 执行服务,得到一个接口,可能是一个CompletableFuture(表示异步调用),可能是一个正常的服务执行结果(同步调用)
// 如果是同步调用会阻塞,如果是异步调用不会阻塞
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
// 将同步调用的服务执行结果封装为CompletableFuture类型
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
// 异步RPC结果
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
//设置一个回调,如果是异步调用,那么服务执行完成后将执行这里的回调
// 不会阻塞
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
// 将服务执行完之后的结果设置到异步RPC结果对象中
asyncRpcResult.complete(result);
});
// 返回异步RPC结果
return asyncRpcResult;
} catch (InvocationTargetException e) {
// 假设抛的NullPointException,那么会把这个异常包装为一个Result对象
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
// 只会抛出RpcException,其他异常都会被包装成Result对象
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
1.调用子类的doInvoke方法去调用服务提供者真正的方法
2.无论是同步还是异步,把调用结果都封装为一个CompletableFuture类型
3.通过CompletableFuture,如果是异步请求也不会阻塞,而是通过CompletableFuture的whenComplete方法设置回调,无论调用服务时是否有异常,都会把请求结果封装为AppResponse,最后返回我们的asyncRpcResult。
接着我们再回来ExchangeHandlerAdapter.reply
方法
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
//非主线代码
// 转成Invocation对象,要开始用反射执行方法了
Invocation inv = (Invocation) message;
//拿到当前调用服务的invoker对象
Invoker<?> invoker = getInvoker(channel, inv);
// 这里设置了,service中才能拿到remoteAddress
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 执行服务,得到结果
Result result = invoker.invoke(inv);
// 返回一个CompletableFuture
return result.completionFuture().thenApply(Function.identity());
3.响应结果
无论是同步方法还是异步方法,封装好CompletableFuture后,接着回到 handleRequest(exchangeChannel, request)
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 请求id,请求版本
Response res = new Response(req.getId(), req.getVersion());
//忽略非主线代码
//获取 data 字段值,也就是 RpcInvocation 对象,表示请求内容
Object msg = req.getData();
try {
// 继续向下调用,分异步调用和同步调用,如果是同步则会阻塞,如果是异步则不会阻塞
CompletionStage<Object> future = handler.reply(channel, msg); // 异步执行服务
// 如果是同步调用则直接拿到结果,并发送到channel中去
// 如果是异步调用则会监听,直到拿到服务执行结果,然后发送到channel中去
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
// 服务执行过程中出现了异常,则把Throwable转成字符串,发送给channel中,也就是发送给客户端
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
这里拿到我们的appResult,无论是否有异常,通过channel.send
返回给我们的客户端,当然这里有一个循环,通过channel.send,上边的handler部分还会调用,判断是否是一个响应,然后对服务提供者的响应进行处理。
下边对我们提供端处理请求画一个流程图:
其实上述流程在调用AbstractProxyInvoker.invoke
方法之前,是需要执行过滤器的逻辑的,过滤器是在哪里设置的呢?是通过SPI机制的AOP功能来增强实现的,在执行服务导出DubboProtocol.export
方法之前,会先执行Protocol接口包装类的的export方法
这里主要说下ProtocolFilterWrapper.export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
接着看buildInvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// ConsumerContextFilter--->FutureFilter--->MonitorFilter
// ConsumerContextFilter用来设置RpcContext
//
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 得到一个异步结果
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
最终会返回一个CallbackRegistrationInvoker
所以我们执行过滤器链的方法在CallbackRegistrationInvoker.invoker
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 执行过滤器链
Result asyncResult = filterInvoker.invoke(invocation);
// 过滤器都执行完了之后,回调每个过滤器的onResponse或onError方法
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// onResponse callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
过滤器链为
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
处理Dubbo的回声检测。
如果判断到是$echo方法,则直接构造一个结果返回:
用于设置类加载器,没什么特别的
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
Thread.currentThread().setContextClassLoader(ocl);
}
}
泛化服务的处理.不是重点,这里就不贴代码了。
设置Dubbo的RpcContext的内容,不是重点也不贴代码了。
处理trace命令相关的逻辑
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.setAttachment(TIMEOUT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
return invoker.invoke(invocation);
}
用于处理服务提供端超时处理的情况,这里设置下调用服务的时间,在过滤器调用完,返回响应的时候会调用onResponse方法
String startAttach = invocation.getAttachment(TIMEOUT_FILTER_START_TIME);
if (startAttach != null) {
long elapsed = System.currentTimeMillis() - Long.valueOf(startAttach);
if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
}
这里就是拿到配置timeout的时间判断是否超时,对于服务端超时来说,只是打印一个日志处理。
和监控相关的逻辑,非重点,不贴代码了。
处理异常信息的Filter,主要是响应的回调会对异常进行处理
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
// directly throw if it's checked exception
// 如果是checked异常,直接抛出
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// directly throw if the exception appears in the signature
// 在方法签名上有声明,直接抛出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// for the exception not found in method's signature, print ERROR message in server's log.
// 未在方法签名上定义的异常,在服务器端打印ERROR日志
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
// 异常类和接口类在同一jar包里,直接抛出
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// directly throw if it's JDK exception
// 是JDK自带的异常,直接抛出
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// directly throw if it's dubbo exception
// 是Dubbo本身的异常,直接抛出
if (exception instanceof RpcException) {
return;
}
// otherwise, wrap with RuntimeException and throw back to the client
// 否则,包装成RuntimeException抛给客户端 TulingExexp
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
return;
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return;
}
}
}
这里主要是针对服务提供者抛出的异常是自定义的,而服务消费端没有这个异常导致消费端无法处理
以下五种情况都不会处理
1.检查时异常
2.java异常
3.Dubbo自带的异常 RpcException
4.throws抛出的异常
5.异常和接口在同一个jar包内
其他情况会把异常信息进行toString,包装为一个RuntimeException添加到响应中。
至此我们的一个服务提供者服务的请求处理完美结束
这就需要前置知识了,服务导出的时候我们知道最终会根据Invoker对象,基于SPI机制生成一个代理对象
JavassistProxyFactory.getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
当真正进行服务调用的时候就会调用InvokerInvocationHandler.invoker
方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 这里的recreate方法很重要,他会调用AppResponse的recreate方法,
// 如果AppResponse对象中存在exception信息,则此方法中会throw这个异常
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
在这里会构建一个RpcInvocation,执行invoker对象的invoke方法,现在的invoker对象具体对象的哪个呢?可以看下图,服务导出生成的对象链结构
根据上图所示,动态代理的invoker对象其实是MockClusterInvoker对象
,在MockClusterInvoker.invoker
方法中就是执行我们的mock逻辑,这里不是重点,就不详细讲了.
假设我们只设置了一个注册中心,那么接下来就会FailFailoverClusterInvoker.invoker方法
,但是FailFailoverClusterInvoker
并没有实现invoker方法,而是会调用父类==AbstractClusterInvoker
==的invoker方法
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
list方法其实的调用我们动态目录的list方法,进行路由的,最终会来到我们RegistryDirectory.doList
方法
@Override
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
在路由链中有我们所有的Invoker对象,通过路由规则先过滤掉一部分的Invoker
在服务路由之后呢,会拿到我们具体的负载均衡器,默认是RandomLoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
return doInvoke(invocation, invokers, loadbalance);
接着就继续调用我们FailFailoverClusterInvoker.doInvoke方法
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
1.查询重试次数,循环调用服务
2.调用select方法进行负载均衡,获得一个最终的执行对象invoker
3.执行最终执行对象invoker对象的invoker方法
Result result = invoker.invoke(invocation);
设置Rpc的Context上下文属性。
实现事件通知相关功能,可以借此完成一些通知功能,在调用方法之前,调用方法之后,出现异常时,会触发对应的事件。
实现Dubbo监控相关逻辑。
当过滤器链的相关方法执行完毕后,会来到AsyncToSyncInvoker.invoke方法
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 异步转同步
Result asyncResult = invoker.invoke(invocation); // AsyncRpcResult--->CompletableFuture
try {
// 如果invocation指定是同步的,则阻塞等待结果
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
return asyncResult;
}
如果没有特殊设置,Netty发生数据是非阻塞式的,如果我们是一个同步方法的话,就需要把阻塞到这里,直到服务提供者返回响应,以下代码就是把我们异步转为同步的核心逻辑,asyncResult其实是一个CompletableFuture
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
继续追踪invoker.invoke(invocation)
方法,此时的invoker对象就是DubboInvoker对象了,所以我们直接看DubboInvoker.invoke
方法
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
// 异步去请求,得到一个CompletableFuture
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
// responseFuture会完成后会调用asyncRpcResult中的方法,这里并不会阻塞,如果要达到阻塞的效果在外层使用asyncRpcResult去控制
asyncRpcResult.subscribeTo(responseFuture);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
1.创建一个AsyncRpcResult
对象
2.调用 currentClient.request(inv, timeout)
方法发生请求返回一个CompletableFuture
3.通过 asyncRpcResult.subscribeTo(responseFuture)
方法,将responseFuture和asyncRpcResult绑定在一起,当responseFuture
的方法执行完成后,会调用AsyncRpcResult
的方法
接着看 currentClient.request(inv, timeout)
这个方法,这里还是数据交换层的逻辑,最终会来到HeaderExchangeChannel.request方法
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
1.创建一个DefaultFuture
对象
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}
在创建DefaultFuture
对象的逻辑中
1.会把请求id跟DefaultFuture的关系,以及请求id跟channel的关系保存到map中:
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
2.启动定时任务,判断调用是否超时
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void run(Timeout timeout) {
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
return;
}
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
那么当我们服务提供者处理完请求后,是如何依次通知DefaultFuture --> CompletableFuture --> AsyncRpcResult的呢?
当我们服务提供者响应结果后,也是需要进行Netty传输,最后会来到HeaderExchangeHandler#received方法
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//非主线代码
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
如果是响应会调用handleResponse
方法,最终会来到DefaultFuture.received
方法
public static void received(Channel channel, Response response, boolean timeout) {
try {
// response的id,
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
}
} finally {
CHANNELS.remove(response.getId());
}
}
可以看出收起根据响应id从map中找到当前请求的DefaultFuture,然后把响应结果赋值给它
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
至此异步转换为同步处理完成。
思考一个问题,如果我们消费端调用服务端,服务端出现异常,Dubbo会怎么处理呢?我们可以追踪下服务调用最开始的地方FailoverClusterInvoker.doInvoke
方法
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
如果出现异常,Dubbo是采用重试机制进行服务重调,默认是重试2次,但是并不是所有异常都会重试
@Override
public String sayHello(String name) throws InterruptedException {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
int i = 1/0;
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}
我们看下生产者日志:
我们再看下消费者日志:
很显然,非RPC异常直接不会重试
服务提供端设置timeOut=4000,实际执行实际为5000
@Override
public String sayHello(String name) throws InterruptedException {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
Thread.sleep(5000);
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}
生产者日志:
虽然超过了超时时间,只是做了日志的打印,没有抛出异常,而且很明显重试了两次,一共进行了3次调用
消费者日志:
很明显抛出的是一个RPC异常,而且是消费端抛出的,然后进行了两次重试。
服务重连机制肯定是由定时任务定时扫描处理的,这一块其实是在服务引入创建交换层客户端HeaderExchangeClient
的时候创建的
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}
在这里 startReconnectTask(url)
就是开启服务重连的定时任务
private void startReconnectTask(URL url) {
if (shouldReconnect(url)) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int idleTimeout = getIdleTimeout(url); // 心跳时间的两倍
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
}
@Override
protected void doTask(Channel channel) {
try {
System.out.println("重连任务");
Long lastRead = lastRead(channel);
Long now = now();
// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
if (!channel.isConnected()) {
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// check pong at client
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}
在这里判断Netty连接是否是好的,如果连接断了那么调用reconnect
方法进行重新连接
心跳机制肯定是由定时任务定时扫描处理的,这一块其实是在服务引入创建交换层客户端HeaderExchangeClient
的时间创建的
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}
在 startHeartBeatTask(url)
方法中就是开启定时任务进行心跳检测
private void startHeartBeatTask(URL url) {
if (!client.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}
可以看出来这里开启心跳检测是有条件的,而对于Netty来说
直接返回true,那么这里的逻辑就不会执行,这是因为Netty本身就有心跳机制,就不需要交换层提供心跳机制检测了。