本文主要研究一下AsyncHttpClient的ListenableFuture
org/asynchttpclient/ListenableFuture.java
public interface ListenableFuture<V> extends Future<V> {
/**
* Terminate and if there is no exception, mark this Future as done and release the internal lock.
*/
void done();
/**
* Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
*
* @param t the exception
*/
void abort(Throwable t);
/**
* Touch the current instance to prevent external service to times out.
*/
void touch();
/**
* Adds a listener and executor to the ListenableFuture.
* The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
* to the executor} for execution when the {@code Future}'s computation is
* {@linkplain Future#isDone() complete}.
* <br>
* Executor can be <code>null</code>, in that case executor will be executed
* in the thread where completion happens.
* <br>
* There is no guaranteed ordering of execution of listeners, they may get
* called in the order they were added and they may get called out of order,
* but any listener added through this method is guaranteed to be called once
* the computation is complete.
*
* @param listener the listener to run when the computation is complete.
* @param exec the executor to run the listener in.
* @return this Future
*/
ListenableFuture<V> addListener(Runnable listener, Executor exec);
CompletableFuture<V> toCompletableFuture();
//......
}
ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法
org/asynchttpclient/ListenableFuture.java
class CompletedFailure<T> implements ListenableFuture<T> {
private final ExecutionException e;
public CompletedFailure(Throwable t) {
e = new ExecutionException(t);
}
public CompletedFailure(String message, Throwable t) {
e = new ExecutionException(message, t);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public T get() throws ExecutionException {
throw e;
}
@Override
public T get(long timeout, TimeUnit unit) throws ExecutionException {
throw e;
}
@Override
public void done() {
}
@Override
public void abort(Throwable t) {
}
@Override
public void touch() {
}
@Override
public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
if (exec != null) {
exec.execute(listener);
} else {
listener.run();
}
return this;
}
@Override
public CompletableFuture<T> toCompletableFuture() {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true
org/asynchttpclient/netty/NettyResponseFuture.java
public final class NettyResponseFuture<V> implements ListenableFuture<V> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "redirectCount");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "currentRetry");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "isDone");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "isCancelled");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "inAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "inProxyAuth");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "contentProcessed");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
private final long start = unpreciseMillisTime();
private final ChannelPoolPartitioning connectionPoolPartitioning;
private final ConnectionSemaphore connectionSemaphore;
private final ProxyServer proxyServer;
private final int maxRetry;
private final CompletableFuture<V> future = new CompletableFuture<>();
//......
@Override
public V get() throws InterruptedException, ExecutionException {
return future.get();
}
@Override
public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
return future.get(l, tu);
}
}
NettyResponseFuture实现了ListenableFuture接口
public final void done() {
if (terminateAndExit())
return;
try {
loadContent();
} catch (ExecutionException ignored) {
} catch (RuntimeException t) {
future.completeExceptionally(t);
} catch (Throwable t) {
future.completeExceptionally(t);
throw t;
}
}
private boolean terminateAndExit() {
releasePartitionKeyLock();
cancelTimeouts();
this.channel = null;
this.reuseChannel = false;
return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
}
private void loadContent() throws ExecutionException {
if (future.isDone()) {
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException("unreachable", e);
}
}
// No more retry
CURRENT_RETRY_UPDATER.set(this, maxRetry);
if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
try {
future.complete(asyncHandler.onCompleted());
} catch (Throwable ex) {
if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
try {
try {
asyncHandler.onThrowable(ex);
} catch (Throwable t) {
LOGGER.debug("asyncHandler.onThrowable", t);
}
} finally {
cancelTimeouts();
}
}
future.completeExceptionally(ex);
}
}
future.getNow(null);
}
done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调
public final void abort(final Throwable t) {
if (terminateAndExit())
return;
future.completeExceptionally(t);
if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
try {
asyncHandler.onThrowable(t);
} catch (Throwable te) {
LOGGER.debug("asyncHandler.onThrowable", te);
}
}
}
abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调
public void touch() {
touch = unpreciseMillisTime();
}
touch方法用当前时间戳更新touch属性
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
if (exec == null) {
exec = Runnable::run;
}
future.whenCompleteAsync((r, v) -> listener.run(), exec);
return this;
}
addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)
public CompletableFuture<V> toCompletableFuture() {
return future;
}
toCompletableFuture方法直接返回future
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
AsyncHandler<T> asyncHandler,
NettyRequest nettyRequest,
ProxyServer proxyServer) {
NettyResponseFuture<T> future = new NettyResponseFuture<>(
request,
asyncHandler,
nettyRequest,
config.getMaxRequestRetry(),
request.getChannelPoolPartitioning(),
connectionSemaphore,
proxyServer);
String expectHeader = request.getHeaders().get(EXPECT);
if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
future.setDontWriteBodyBecauseExpectContinue(true);
return future;
}
private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
AsyncHandler<T> asyncHandler,
NettyResponseFuture<T> future,
ProxyServer proxyServer,
boolean performConnectRequest) {
NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
performConnectRequest);
Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
return Channels.isChannelActive(channel)
? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
: sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
}
NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求
AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。