Java 多线程之线程池基础

发布时间:2023年12月20日

一、概述

  • 线程池是一种管理和复用线程的机制,它包含一组预先创建的线程,用于执行各种任务。线程池的主要作用是提高多线程应用程序的性能和效率,并提供对线程的生命周期和资源的管理,包括线程的创建、销毁和复用。主要作用如下:

    • 降低资源消耗:线程的创建和销毁是一项开销较大的操作。通过使用线程池,可以避免频繁地创建和销毁线程,从而降低了系统的资源消耗。
    • 提高响应速度:线程池中的线程可以立即执行任务,而不需要等待线程的创建。这可以显著提高任务的响应速度,尤其是在大量并发任务的情况下。
    • 控制并发线程数量:线程池可以限制并发线程的数量,防止系统因为创建过多的线程而导致资源耗尽或性能下降。通过设置适当的线程池大小,可以在充分利用系统资源的同时,避免线程过多带来的负面影响。
    • 提供线程管理和监控:线程池提供了对线程的生命周期和状态的管理。可以通过线程池来监控线程的执行情况、统计任务执行时间等,方便调试和性能优化。
    • 支持任务队列:线程池通常与任务队列结合使用,将待执行的任务放入队列中,由线程池中的线程按照一定的调度策略进行执行。任务队列可以缓冲任务,避免任务提交过快导致系统负荷过大,同时提供了一种异步执行任务的机制。
  • 线程池相关类关系图

在这里插入图片描述

二、Executors 说明

  • 使用线程池,最简单的方法就是使用 Executors 类。Executors 有以下核心方法

    • newSingleThreadExecutor 创建一个只有一个线程的线程池对象,使用 SynchronousQueue 保存任务队列。
    • newFixedThreadPool 创建一个固定核心线程的线程池对象,使用 LinkedBlockingQueue 保存任务队列。
    • newCachedThreadPool 创建一个没有核心线程的线程池对象,使用 LinkedBlockingQueue 保存任务队列。没有线程空闲,就新起一个线程来处理任务。
    • newScheduledThreadPool 创建一个定时任务线程池对象,使用 DelayedWorkQueue 保存任务队列。
    • newWorkStealingPool 创建一个任务分叉线程池对象,每个线程都单独的任务队列,当自己没有任务的时候再去别人任务队列拿任务。
  • Executors 使用方法

    • 使用 Executors 创建线程池对象 executorService 后可以执行任务。
    • 可以使用 ExecutorService 的 execute 方法异步执行无返回值的 Runnable 接口任务。
    • 可以使用 ExecutorService 的 submit 方法异步执行有返回值的 Runnable、Callable 接口任务。
    • 可以使用 ExecutorService 的 invokeXXX 方法同步执行 Runnable、Callable 接口任务。
        private static void testExecutors() throws ExecutionException, InterruptedException {
            // 创建一个线程池,核心线程数为2(默认运行的线程数量为2)
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            //ExecutorService executorService = Executors.newCachedThreadPool();
            //ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            // 方法一:使用线程池执行任务,没有返回值
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    someTask();
                }
            });
            // 方法二:使用线程池提交任务,有返回值
            Future<Boolean> future = executorService.submit(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    someTask();
                    return true;
                }
            });
            // 方法三:执行任务多个,有返回值
            List<Callable<Boolean>> callables = new ArrayList<>();
            callables.add(()->{
                // 这是一个 Lambda 表达式任务
                return true;
            });
            List<Future<Boolean>> futures = executorService.invokeAll(callables);
    
            // 等待任务执行完成
    
            executorService.shutdown();
        }
    
        private static void someTask() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("这里是一个复杂的业务逻辑(耗时任务)");
        }
    

二、线程池 Hello World

  • 下面是使用 Executors 创建线程池并执行线程的示例。

    • Executors 的静态方法 newFixedThreadPool 可以创建线程池,并返回 ExecutorService 对象。
    • ExecutorService 对象的 submit 方法运行任务,任务可以是 Runnable 和 Callable。
  • 下面创建一个线程池,包括2个核心线程,执行一个 Lambda 表达式定义的任务

    • 注意:我这里只添了一个任务,实际使用时可以使用 submit 添加多个任务到线程池。
        private static void  test1() throws ExecutionException, InterruptedException {
            // 创建一个线程池,核心线程数为2(默认运行的线程数量为2)
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            // submit 是异步的
            Future<Integer> future = executorService.submit(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 1;
            });
            // get 是同步的,阻塞的
            int result = future.get();
            System.out.println("result="+result);
    
            executorService.shutdown();
        }
    
  • 为了方便理解,我将上面的 Lambda 表达式任务改成 Callable 接口任务,如下:

        private static void  test2() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            Callable callable = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    // 这里是一个复杂的业务逻辑(耗时任务)
                    return 2;
                }
            };
    
            // submit 是异步的
            Future<Integer> future = executorService.submit(callable);
            // get 是同步的,阻塞的
            int result = future.get();
            System.out.println("result="+result);
    
            executorService.shutdown();
        }
    

三、FutureTask 的作用

  • 从上面线程池的例子中可以看出当向线程池中提交任务(submit)时返回了一个 Future 对象。

  • FutureTask是Java中的一个实现了Future和Runnable接口的类,用于表示一个异步计算任务。用于获取计算结果、取消任务、判断任务是否完成等功能。主要方法如下:

    • get() 获取由FutureTask执行的计算任务的结果。如果计算任务尚未完成,get()方法会阻塞当前线程,直到计算任务完成并返回结果。
    • cancel() 取消尚未开始或正在执行的任务。取消任务后,如果任务尚未开始执行,则可以阻止任务的执行;如果任务正在执行,则可以尝试中断任务的执行。
    • isDone() 判断FutureTask封装的任务是否已经完成。当任务执行完成或被取消时,isDone()方法会返回true。
  • 下面使用FutureTask包装 Lambda 任务,然后新创建一个线程来执行,再使用 FutureTask 的 get 方法获取返回值。

    • 注意:任务类型如果是 Runnable 类型是不能获取返回值,而任务是 Callable 类型才获取返回值。因为 Runnable 的 run 无返回值,而 Callable 的 call 方法有返回值。
        private static void  test3() throws ExecutionException, InterruptedException {
    
            FutureTask<Integer> futureTask = new FutureTask<>(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 3;
            });
    
            new Thread(futureTask).start();
    
            System.out.println("result="+futureTask.get());
        }
    

四、CompletableFuture 的作用

  • Future 还有一个很好用的子类叫 CompletableFuture 。他简化了 Future 类型任务执行,主要功能如下:

    • 异步执行:CompletableFuture可以用于执行异步操作,例如网络请求、数据库查询等,而不会阻塞主线程。通过supplyAsync()或runAsync()等方法,可以将任务提交给线程池执行,并返回一个CompletableFuture对象,表示异步任务的执行结果。
    • 链式操作:CompletableFuture支持链式操作,可以通过方法链的方式组合多个操作,形成一个任务流水线。通过调用thenApply()、thenAccept()、thenCompose()等方法,可以在任务完成后触发回调函数,并对任务的结果进行处理或执行其他操作。
    • 合并多个任务:CompletableFuture提供了一些静态方法,如allOf()、anyOf()等,用于合并多个CompletableFuture对象的结果。allOf()方法在所有任务完成后返回一个CompletableFuture,它的结果是一个包含所有任务结果的CompletableFuture数组。anyOf()方法在任意一个任务完成后返回一个CompletableFuture,它的结果是第一个完成的任务的结果。
    • 异常处理:CompletableFuture提供了一些方法,如exceptionally()和handle(),用于处理任务执行过程中的异常情况。可以在任务执行出现异常时触发回调函数,并对异常进行处理或返回一个默认值。
    • 等待任务完成:可以使用join()方法来等待CompletableFuture的任务执行完成,并获取任务的结果。与get()方法不同的是,join()方法不会抛出已检查异常,使得它更适合在函数式编程中使用。
  • CompletableFuture 使用示例

        private static void  test4() throws ExecutionException, InterruptedException {
    
            CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 1;
            });
            CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 2;
            });
            CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 3;
            });
            // 等待所有任务完成
            CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
            System.out.println("r1="+completableFuture1.get()+",r2="+completableFuture2.get()+",r3="+completableFuture3.get());
    
    
    		// 链式调用
            CompletableFuture completableFuture4 = CompletableFuture.supplyAsync(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
                return 4;
            }).thenApply(String::valueOf).thenApply(text->"hello "+ text).thenAccept(System.out::println);
        }
    

五、定时任务线程池使用

  • 使用 Executors.newScheduledThreadPool 可以创建定时任务线程池,定时任务线池可以使任务在某个时间点上执行.

  • 如下,创建的线程池执行任务时,任务将在30分钟以后执行。

        private static void test5() throws ExecutionException, InterruptedException {
    
            // 如下,每隔30分钟执行一次任务
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
            }, 0, 30, TimeUnit.MINUTES); // 30 表示延后30分钟
    
    
            // 等待任务执行
    
            scheduledExecutorService.shutdown();
        }
    

六、任务分叉线程池

  • 任务分叉合并线程池内部使用 ForkJoinPool 实现,并且每个线程都有自己的工作队列。当某个线程的队列为空时,它可以从其他线程的队列中“窃取”任务来执行。

    任务分叉合并线程池 ForkJoinPool 可以执以下两种任务

    • RecursiveAction 不带返回值的递归任务

    • RecursiveTask 带返回值的递归任务

            RecursiveAction action = new RecursiveAction() {
                @Override
                protected void compute() {
    				// 复杂计算任务
                }
            };
            RecursiveTask task = new RecursiveTask() {
                @Override
                protected Object compute() {
                    // 复杂计算任务
                    return null;
                }
            };
            ForkJoinPool p = new ForkJoinPool();
            p.execute(action);
            p.execute(task);
    
  • 这种线程池适用于执行大量的独立、耗时较长的任务,可以更好地利用多核处理器的性能。

    请注意,工作窃取线程池在不同的操作系统和Java版本上可能会有一些差异,因此在具体的应用场景中,你可能需要根据性能和需求进行调优。

        private static void test6() throws ExecutionException, InterruptedException {
    
            // 分叉任务线程池
            ExecutorService executorService = Executors.newWorkStealingPool();
            executorService.execute(() -> {
                // 这里是一个复杂的业务逻辑(耗时任务)
            });
    
    
            // 等待任务执行
    
            executorService.shutdown();
        }
    
文章来源:https://blog.csdn.net/qifu123/article/details/135113978
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。