(1)消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用;
(2)因为Netty默认的线程模型为All,因此AllChannelHandler类把接收到的所有消息(包括请求事件、响应事件、连接事件、断开事件,心跳事件等)包装成ChannelEventRunnable任务,并将其投递到线程池中;
(3)接着执行线程池中的任务,并最终将调用DubboProtocol的connected方法。
消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用。connected方法为NettyServer父类AbstractServer的connected方法。
其中的依次调用关系为:AbstractServer的connected()->AbstractPeer的connected()->ChannelHandler的connected()。具体实现如下所示。
(1)AbstractServer的connected()
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
if (accepts > 0 && getChannelsSize()> accepts) {
logger.error(INTERNAL_ERROR, "unknown error in remoting module", "", "Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
(2)AbstractPeer的connected()
private final ChannelHandler handler;
public void connected(Channel ch) throws RemotingException {
if (closed) {
return;
}
handler.connected(ch);
}
调用ChannelHandler的connected()时,因为Netty默认的线程模型为All,因此AllChannelHandler类(ChannelHandler的子类)把接收到的所有消息包装成ChannelEventRunnable任务,并将其投递到线程池中。具体实现如下所示。
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService executor = getSharedExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
}
}
}
执行线程池中的任务时,将执行ChannelEventRunnable的run方法,其实现细节具体如下所示。
public void run() {
InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();
try {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "unknown state: " + state + ", message is " + message);
}
}
} finally {
InternalThreadLocalMap.set(internalThreadLocalMap);
}
}
执行handler.connected(channel)时,将调用HeaderExchangeHandler#connected方法,具体实现如下所示。
public void connected(Channel channel) throws RemotingException {
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
handler.connected(exchangeChannel);
channel.setAttribute(Constants.CHANNEL_SHUTDOWN_TIMEOUT_KEY,
ConfigurationUtils.getServerShutdownTimeout(channel.getUrl().getOrDefaultApplicationModel()));
}
接着在执行handler.connected(exchangeChannel)时,将调用DubboProtocol#connected方法,实现如下所示。
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
tryToGetStubService(channel, invocation);
}
received(channel, invocation);
} catch (Throwable t) {
logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
执行接口请求最终将调用DubboProtocol#reply方法,具体实现如下所示。
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 1、获取调用方法对应的Invoker
Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
}
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
// 2、获取上下文对象,并设置对端地址
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
// 3、执行invoker调用链
Result result = invoker.invoke(inv);
// 4、返回结果
return result.thenApply(Function.identity());
}