Java并发编程: ExecutorCompletionService详解

发布时间:2024年01月19日

一、什么场景下使用ExecutorCompletionService

当在项目中我们向使用线程池处理任务时,在任务处理完成后想要的到返回值进而进行其他的逻辑处理,这个时候就可以使用ExecutorCompletionService类,任务执行完成后即可根据返回值进行其他的逻辑处理。使用提供的Executor执行任务的CompletionService。该类安排提交的任务在完成后放置在使用take可访问的队列中。该类非常轻量级,适合在处理任务组时临时使用。用法示例:
(1)假设你有一个特定问题的求解器集合,每个都返回某种类型的值Result,并且想要并发运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。你可以这样写:

  void solve(Executor e,
             Collection<Callable<Result>> solvers)
      throws InterruptedException, ExecutionException {
      CompletionService<Result> ecs
          = new ExecutorCompletionService<Result>(e);
      for (Callable<Result> s : solvers)
          ecs.submit(s);
      int n = solvers.size();
      for (int i = 0; i < n; ++i) {
          Result r = ecs.take().get();
          if (r != null)
              use(r);
      }

(2)假设你想使用任务集的第一个非空结果,忽略任何遇到异常的结果,并在第一个任务准备好时取消所有其他任务:

void solve(Executor e,
             Collection<Callable<Result>> solvers)
      throws InterruptedException {
      CompletionService<Result> ecs
          = new ExecutorCompletionService<Result>(e);
      int n = solvers.size();
      List<Future<Result>> futures
          = new ArrayList<Future<Result>>(n);
      Result result = null;
      try {
          for (Callable<Result> s : solvers)
              futures.add(ecs.submit(s));
          for (int i = 0; i < n; ++i) {
              try {
                  Result r = ecs.take().get();
                  if (r != null) {
                      result = r;
                      break;
                  }
              } catch (ExecutionException ignore) {}
          }
      }
      finally {
          for (Future<Result> f : futures)
              f.cancel(true);
      }
 
      if (result != null)
          use(result);
  }}

二、ExecutorCompletionService详解

1、原理

ExecutorCompletionService 是 Java 中的一个类,它结合了 Executor 和 BlockQueue 的功能。其主要目的是在提交任务线程后,每个线程任务完成后,将结果放入阻塞队列中,然后可以通过阻塞队列的 take() 方法获取对应线程的执行结果。

  • 这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在 ExecutorCompletionService 内部持有该线程池进行线程执行。在原有的线程池功能基础上,装饰额外的功能。

  • 在使用 ExecutorCompletionService 时,需要提供一个自定义的线程池 Executor,同时也可以指定一个自定义的队列作为线程执行结果的容器。当线程执行完成时,通过重写 FutureTask#done() 将结果压入队列中。

  • 当用户把所有的任务都提交后,可以通过 ExecutorCompletionService#poll / take方法来弹出已完成的结果。这样做的好处是可以节省获取完成结果的时间。如果不使用队列,我们需要对 FutureTask 进行遍历,因为我们不知道哪个线程先执行完成,只能挨个去获取结果,这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总。如果使用队列,我们可以在其他任务线程执行的过程中汇总已完成的结果,节省汇总时间。

该类的结构如下:
在这里插入图片描述
(1)成员变量:

  • completionQueue: 阻塞队列用于存储任务完成后返回Future对象。
  • executor: 执行任务所需要的线程池对象。
  • aes: 如果executor是AbstractExecutorService的实例,则aes赋值为该对象,并用于创建FutureTask对象

(2)QueueingFuture
该类是ExecutorCompletionService的一个内部类,实现了FutureTask接口,通过重写FutrureTask中的done方法来实现任务的执行结构存放到completionQueue阻塞队列中。

  /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
2、什么情况下使用这个类

使用场景正如上面所讲述的,有两种使用场景:

  • 当我们想使用线程池,每个任务都返回某种类型的值Result,并且想要并发运行它们。
  • 使用任务集的第一个非空结果,忽略任何遇到异常的结果,并在第一个任务准备好时取消所有其他任务。

三、使用场景

结合https://blog.csdn.net/yuming226/article/details/91404365中介绍的表授权异步处理,使用ExecutorCompletionService 类的处理形式如下:

@Component
@ConditionalOnProperty(name = "xxx.execute.type", havingValue = "asyn")
@Slf4j
public class AsynExecute implements IExecute {
    private final XxxxThreadPool executor;
    public AsynExecute (XxxxThreadPool executor) {
        this.executor = executor;
    }
    @Override
    public <T> void execute(Collection<Callable<T>> tasks) {
        log.info("-- start asyn execute db-grant-right --");
        CompletionService<T> completionService = new ExecutorCompletionService<>(executor);
        List<Future<T>> futures = new ArrayList<>(tasks.size());
        for (Callable<T> task : tasks) {
            futures.add(completionService.submit(task));
        }

        for (int i = 0; i < futures.size(); i++) {
            getDone(take(completionService));
        }
        log.info("-- end asyn execute xxxt --");
    }

    public static <T> T getDone(Future<T> future) {
        Objects.requireNonNull(future, "future is null");
        Preconditions.checkArgument(future.isDone(), "future not done yet");
        return getFutureValue(future);
    }

    private static <T> T getFutureValue(Future<T> future) {
        try {
            return future.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("interrupt", e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> Future<T> take(CompletionService<T> completionService) {
        try {
            return completionService.take();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted", e);
        }
    }
}

整库授权同步处理和线程池处理对比:
在这里插入图片描述

y轴表示整库授权完成消耗的时间单位为秒,x轴表示库下表的数量,使用ExecutorCompletionService后性能上有很大的提升。

文章来源:https://blog.csdn.net/yuming226/article/details/135606089
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。