小议CompletableFuture 链式执行和响应式编程

发布时间:2024年01月12日

相关文章:

背景

昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。

这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。

对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:

/**
 * @author dongguabai
 * @date 2024-01-09 13:52
 */
public abstract class MergeRequestHandler {

    private MergeRequestHandler next;

    public MergeRequestHandler(MergeRequestHandler next) {
        this.next = next;
    }

    public void handle(Task task) {
        if (run(task) && next != null) {
            next.handle(task);
        }
    }

    public abstract boolean run(Task task);
}
/**
 * @author dongguabai
 * @date 2024-01-09 15:53
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {

    public static MergeRequestHandler createHandlerChain() {
        return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));
    }
}

在链式工厂里面指定了执行顺序。

当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors

@Override
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false);
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain 时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture 来实现,这也是我当前代码改造的主要方向。

关于CompletableFuture的实现,我设计了两套方案。现在来看一下最终实现的方案。

抽象 Processor

@Log4j
abstract class Processor {

    public CompletableFuture<Task> process(Task task, Executor executor) {
        return CompletableFuture.supplyAsync(() -> doProcess(task), executor);
    }

    protected abstract Task doProcess(Task task);
}

实现类:

@Log4j
class Processor1 extends Processor {
    @Override
    protected Task doProcess(Task task) {
        // 你的处理逻辑
        log.info("Processor1 is processing");
        ProcessorChain.sleep(1);
        log.info("Processor1 end");
        if (task.isCancelled()) {
            throw new CancellationException("Task was cancelled");
        }
        return task;
    }
}
@Log4j
class Processor2 extends Processor {
    @Override
    protected Task doProcess(Task task) {
        // 你的处理逻辑
        log.info("Processor2 is processing");
        ProcessorChain.sleep(2);
        log.info("Processor2 end");
        if (task.isCancelled()) {
            throw new CancellationException("Task was cancelled");
        }
        return task;
    }
}

核心的处理链:

@Log4j
class ProcessorChain {
    private final List<Processor> processors;

    public ProcessorChain(List<Processor> processors) {
        this.processors = processors;
    }

    public CompletableFuture<Void> process(Task task, Executor executor) {
        CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
        for (Processor processor : processors) {
            future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
        }
        return future.thenAccept(result ->log.info("处理完成,结果是:" + result));
    }

    public static void sleep(Integer i){
        try {
            TimeUnit.SECONDS.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

测试代码:

    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();
        ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));
        Task task = new Task("1");
        CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {
            System.out.println("处理过程中出现错误:" + ex.getMessage());
            return null;
        });
        log.info("处理完成");
    }

输出:

22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646

效果挺符合诉求的,但这里也有几个疑问。

ProcessorChain#process中的执行顺序问题

先看 ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {
        CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
        for (Processor processor : processors) {
            future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
        }
        return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));
    }

朋友原话是:“他期望的执行顺序是 process1process2process3。但由于 thenComposeAsync是异步执行的,那么在循环到 process2 时,它会异步执行。此时,循环可能已经到了 process3,并开始执行 thenComposeAsync。这样,process3process2可能会并行执行,这跟期望结果不一致”。

其实这里是不会的,thenComposeAsync 可以翻译为 “然后异步组合”。

这里的 “组合” 指的是将当前的 CompletableFuture 与另一个 CompletableFuture 进行组合。“异步” 指的是这里的 Function 操作会在一个单独的线程中执行,不会阻塞当前的线程。

我这里的 Processor#process 函数返回的是 CompletableFuture,所以这里 thenComposeAsync 的流程是:等待当前的 CompletableFuture 完成后,异步执行 FunctionFunction 会返回一个新的 CompletableFuture,然后 thenComposeAsync 会返回这个 CompletableFuture

这里要注意 thenApplyAsyncthenComposeAsync 不要混淆了。

ProcessorChain#process中的阻塞与非阻塞、同步和异步问题

上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。

为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess 的实现。

测试代码:

    public static void main(String[] args) {
        Executor executor = Executors.newSingleThreadExecutor();
        ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));
        Task task = new Task("1");
        CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {
            System.out.println("处理过程中出现错误:" + ex.getMessage());
            return null;
        });
        log.info("处理完成");
    }

ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {
        CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
        for (Processor processor : processors) {
            future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
        }
        return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));
    }

执行流程如下:

  1. main 线程开始执行,并调用 ProcessorChainprocess 方法。
  2. ProcessorChainprocess 方法中,main 线程创建了一个已经完成的 CompletableFuture,并开始遍历 Processor 对象。
  3. main 线程调用 thenComposeAsync 方法,此时 main 线程将 Processor1process 方法提交给 Executor,并立即返回一个新的 CompletableFuturemain 线程不会等待 Processor1process 方法完成,因此 main 线程是非阻塞的。
  4. Executor 的一个线程(称之为 Thread_1)开始执行 Processor1process 方法。由于 thenComposeAsync 的链式调用,Processor2process 方法会等待 Processor1process 方法完成后才开始执行,以此类推。
  5. main 线程继续遍历 Processor 对象,并对每个 Processor 重复步骤 3 和 4。每次调用 thenComposeAsync 方法时,main 线程都会立即返回一个新的 CompletableFuture,并将 Processorprocess 方法提交给 Executor。这意味着 main 线程是非阻塞的,而 Executor 的线程会按照 Processor 的顺序执行 process 方法。
  6. 当所有的 Processorprocess 方法都完成后,其中一个 Executor 的线程会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task); 只是作为一个“CompletableFuture 的启动器”(这么一说是不是就更好理解了)。

响应式编程

其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:

例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。

其实这里的 CompletableFuture 实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFutureFunction 完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。

可以增加日志看一下:

    public CompletableFuture<Void> process(Task task, Executor executor) {
        CompletableFuture<Task> future = CompletableFuture.completedFuture(task);
        for (Processor processor : processors) {
            log.info(future);
            future = future.thenComposeAsync(t -> processor.process(t, executor), executor);
            log.info(future);
          // future.thenComposeAsync(t -> processor.process(t, executor), executor);
        }
        return future.thenAccept(result ->log.info("处理完成,结果是:" + result));
    }

再次执行测试代码:

22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a

重新梳理执行流程:

  1. 最开始的启动器是 3fa77460,这是一个已经完成的 CompletableFuture 对象,用于启动异步操作链。

  2. 第一次循环:3fa77460 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 e2d56bf。此时,Processor1process 方法正在异步执行。

  3. 第二次循环:e2d56bf 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 244038d0。此时,Processor2process 方法正在等待 Processor1process 方法完成。

  4. 第三次循环:244038d0 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5680a178。此时,Processor3process 方法正在等待 Processor2process 方法完成。

  5. 第四次循环:5680a178 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5fdef03a。此时,Processor4process 方法正在等待 Processor3process 方法完成。

  6. 所有循环结束后,返回 5fdef03a。当所有的 Processorprocess 方法都完成后,5fdef03a 会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

这个过程中,每个 CompletableFuture 对象都会等待前一个 CompletableFuture 对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture 链,每个 CompletableFuture 对象都会按照 Processor 的顺序执行 process 方法。

CompletableFuture链

当调用thenComposeAsync方法时,会创建一个新的CompletableFuture对象:

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }
    private <V> CompletableFuture<V> uniComposeStage(
        Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
        if (f == null) throw new NullPointerException();
        Object r; Throwable x;
        if (e == null && (r = result) != null) {
            // try to return function result directly
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    return new CompletableFuture<V>(encodeThrowable(x, r));
                }
                r = null;
            }
            try {
                @SuppressWarnings("unchecked") T t = (T) r;
                CompletableFuture<V> g = f.apply(t).toCompletableFuture();
                Object s = g.result;
                if (s != null)
                    return new CompletableFuture<V>(encodeRelay(s));
                CompletableFuture<V> d = new CompletableFuture<V>();
                UniRelay<V> copy = new UniRelay<V>(d, g);
                g.push(copy);
                copy.tryFire(SYNC);
                return d;
            } catch (Throwable ex) {
                return new CompletableFuture<V>(encodeThrowable(ex));
            }
        }
        CompletableFuture<V> d = new CompletableFuture<V>();
        //构建 UniCompose
        UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
        return d;
    }

构造的 UniCompose对象持有前一个CompletableFuture对象的引用,也就是 UniComposesrc 字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。

总结

本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture 的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。

References

  • https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B

欢迎关注公众号:
在这里插入图片描述

文章来源:https://blog.csdn.net/Dongguabai/article/details/135541854
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。