JUC 线程池

发布时间:2024年01月17日

线程池

线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。我们可以创建线程池来复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。

线程池的优点

  • 提高资源利用率:通过重复利用已创建的工作线程,减少了创建和销毁线程所消耗的时间和内存开销。
  • 提高响应速度:当有新的任务到达时,可以立即分配给空闲或者新建的工作线程处理,而不需要等待创建新线程所需时间。
  • 系统稳定性:通过设置合理地参数(如核心线程数、最大线程数、队列容量、拒绝策略等),可以有效地控制并发度和负载情况,避免因为过多或过少地使用线程导致系统崩溃或性能下降。
  • 统一管理资源:线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程一个个地处理任务更方便,管理更方便,同时也有利于数据统计。

Executor 基础架构

在这里插入图片描述

Executor接口(java.util.concurrent.Executor)

它是Executor的基础与核心,其定义如下:

public interface Executor {

    /**
     * 执行任务
     * @param 要执行的任务
     * @throws RejectedExecutionException 如果任务无法执行
     * @throws NullPointerException 如果 command 为空
     */
    void execute(Runnable command);
}

Executor 接口将任务的提交与执行分离开来。

ExecutorService

ExecutorService继承于Executor,它提供一些可以管理任务的执行、取消、结束的方法。

public interface ExecutorService extends Executor {

    /**
     * 有序关闭,调用此方法后,不再接受新的任务提交,但之前提交的任务不会关闭。
     * 此方法也不等待以前提交的任务执行完成,awaitTermination 方法才能实现该操作
     *
     */
    void shutdown();

    /**
     * 尽力停止处理正在执行的任务
     * 尝试停止所有正在执行的任务,停止处理等待的任务,并返回等待执行的任务列表。
     *
     * @return 从未开始执行的任务列表
     */
    List<Runnable> shutdownNow();

    /**
     * 返回 true 表示已经关闭
     *
     */
    boolean isShutdown();

    /**
     * 如果关闭后所有任务都已完成,则返回true。
     * 注:除非先调用shutdown或shutdownNow,否则isTerminated永远不会为true。
     *
     * @return 如果关闭后所有任务都已完成,则为true
     */
    boolean isTerminated();

    /**
     * 在 shutdown 之后,保证所有任务在关闭请求之后完成执行(正常执行完毕、超时或被中断)
     *
     * @param 等待的最长时间
     * @param 时间单位
     * @return {@code true} 如果执行程序已终止
     *         {@code false} 如果在终止前超时
     * @throws InterruptedException 在等待中被打断时抛出异常
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个带返回值的任务执行
     * 通过 Future.get() 方法获取返回结果
     *
     * @param Callable 类型的任务
     * @param <T> 任务返回结果泛型
     * @return 返回一个 Future 对象
     * @throws 如果任务无法被安排执行抛出 RejectedExecutionException 异常
     * @throws task 为空,抛出 NullPointerException 异常
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个任务 task,通过 Future.get() 方法获取 result 
     * submit 方法在底层实际是执行的 execute 方法,execute 方法没有返回值,
     * submit 方法在调用 execute 时,使用了 Future 架构进行了扩展而已
     * 关于 Future 我们会在后续讲解
     *
     * @param task 提交执行的任务
     * @param result 通过 Future.get() 方法获取的返回结果
     * @param <T> 返回结果泛型
     * @return 返回 Future 对象
     * @throws 如果任务无法被安排执行抛出 RejectedExecutionException 异常
     * @throws task 为空,抛出 NullPointerException 异常
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个任务 task,通过 Future.get() 方法返回 null(因为没有返回结果)
     *
     * @param task 提交执行的任务
     * @return 返回 Future 对象
     * @throws 如果任务无法被安排执行抛出 RejectedExecutionException 异常
     * @throws task 为空,抛出 NullPointerException 异常
     */
    Future<?> submit(Runnable task);

    /**
     * 批量提交带返回值的任务
     * 
     * 注:已完成的任务可能是正常结束也可能出现异常结束
     * 
     * @param tasks 任务集合
     * @param <T> 集合任务返回值泛型
     * @return Future 对象列表
     * @throws InterruptedException 在等待时被中断
     * @throws NullPointerException 列表为空或列表中某个任务为空
     * @throws RejectedExecutionException 任意任务无法安排执行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 带超时的批量提交任务
     * 
     * 如果超时,则某些任务将无法完成
     * 
     * @param tasks 任务列表
     * @param timeout 超时时长
     * @param unit 时间单位
     * @param <T> 结果泛型
     * @return Future 对象列表
     * @throws InterruptedException 在等待时被中断
     * @throws NullPointerException 列表为空或列表中某个任务为空
     * @throws RejectedExecutionException 任意任务无法安排执行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 一旦有任意一个任务正常完成(执行过程中没有抛异常),线程池会终止其他未完成的任务。
     * 
     *
     * @param tasks 任务列表
     * @param <T> 返回值泛型
     * @return 返回执行成功的任务的返回值
     * @throws InterruptedException 在等待时被打断
     * @throws NullPointerException 任务列表或任意任务为空
     * @throws IllegalArgumentException 任务列表为空列表
     * @throws ExecutionException 如果任务列表中没有执行成功的任务
     * @throws RejectedExecutionException 如果任务列表无法被安排执行
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 带超时的有任意一个任务正常完成(执行过程中没有抛异常),线程池会终止其他未完成的任务。
     * 
     *
     * @param tasks 任务列表
	 * @param timeout 超时时长
     * @param unit 时间单位
     * @param <T> 返回值泛型
     * @return 返回执行成功的任务的返回值
     * @throws InterruptedException 在等待时被打断
     * @throws NullPointerException 任务列表或任意任务为空
     * @throws IllegalArgumentException 任务列表为空列表
     * @throws ExecutionException 如果任务列表中没有执行成功的任务
     * @throws RejectedExecutionException 如果任务列表无法被安排执行
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

submit 方法在底层实际是执行的 execute 方法,execute 方法没有返回值,submit 方法在调用 execute 时,使用了 Future 架构进行了扩展而已,关于 Future 我们会在后续讲解,我先把线程池的 API 搞清楚再说。

ThreadPoolExecutor、ScheduledThreadPoolExecutor

ThreadPoolExecutor、ScheduledThreadPoolExecutor(提供任务调度的线程池) 是 ExecutorService 的两个具体的实现

通过 Executors 创建线程池

注:在阿里的Java开发规约中,不推荐使用 Executors 创建线程池,我们这里只是为了先介绍 ExecutorService 的相关 API 方法,后续我们会专门讨论正确的线程池创建方式

// 创建 ThreadPoolExecutor 线程池
// 只有一个线程都线程池
ExecutorService pool1 = Executors.newSingleThreadExecutor();
// 固定大小的线程池(其实就是核心线程和最大线程数都为10)
ExecutorService pool2 = Executors.newFixedThreadPool(10);
// 原则上无限大小的线程池(最大线程数为:Integer.MAX_VALUE)
ExecutorService pool3 = Executors.newCachedThreadPool();


// 创建 ScheduledThreadPoolExecutor 线程池
Executors.newSingleThreadScheduledExecutor();
Executors.newScheduledThreadPool(10);

其实这些方法底层都使用了 ThreadPoolExecutor、ScheduledThreadPoolExecutor 对应到构造方法,这个我们后续再说,本章我们先说说 ExecutorService 的这些 API 方法的使用。

提交执行任务

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j
public class SubmitTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> f1 = pool.submit(()-> {
            log.debug("callable task");
            return "callable task";
        });
        Future<String> f2 = pool.submit(()-> log.debug("Runnable task,has result"),"Runnable task,has result" );
        Future<?> f3 = pool.submit(()-> log.debug("Runnable task,has no result"));

        log.debug("f1 结果:{}",f1.get());
        log.debug("f2 结果:{}",f2.get());
        log.debug("f3 结果:{}",f3.get());

    }

}

结果:

14:07:22.489 [pool-1-thread-1] DEBUG com.yyoo.thread.executor.SubmitTest - callable task
14:07:22.489 [pool-1-thread-2] DEBUG com.yyoo.thread.executor.SubmitTest - Runnable task,has result
14:07:22.495 [pool-1-thread-2] DEBUG com.yyoo.thread.executor.SubmitTest - Runnable task,has no result
14:07:22.495 [main] DEBUG com.yyoo.thread.executor.SubmitTest - f1 结果:callable task
14:07:22.497 [main] DEBUG com.yyoo.thread.executor.SubmitTest - f2 结果:Runnable task,has result
14:07:22.497 [main] DEBUG com.yyoo.thread.executor.SubmitTest - f3 结果:null

日志反应的比较清楚,我们的线程池只有2个线程,线程2 执行了两个提交的任务,f1 的结果为任务返回的结果,f2 的结果为我们提交任务是传的结果,f3 不返回结果(因为 Runnable 接口的 run 方法返回值为 void)

这里的线程名称格式为:pool-[虚拟机中线程池编号]-thread-[线程编号],如 pool-1-thread-1,在实际使用时我们最好自定义线程名称,以提高日志的可读性,避免日后排查日志无法第一时间确定执行的线程。

执行的任务中报异常

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class ExecutorTest {

    public static void main(String[] args) {

        ExecutorService pool = Executors.newFixedThreadPool(2);

        // 某个任务执行失败,并不影响其他任务执行
        // 且执行该任务的线程还能执行其他任务
        Future<?> f = pool.submit(()->{
           int x = 0;
           log.debug("{}",10 / x);
        });

        pool.submit(()->{
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("是否能正常执行");
        });

        pool.submit(()->{
            log.debug("是否能正常执行");
        });

        pool.submit(()->{
            log.debug("是否能正常执行");
        });

        try {
            // 如果不调用 get 方法,则如果任务执行失败,程序也不会有异常打印
            // 如果任务中出现异常,则 get 方法会抛出 ExecutionException,其中包含任务执行时的真正异常
            // 这也是为什么 submit(Runnable task) 方法明明不需要返回值,为啥还要返回 Future<?> 的原因
            log.debug("{}",f.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        pool.shutdown();
    }

}

注:如果不使用 Future 的 get 方法来获取和处理异常,就需要我们在任务方法内进行 try…catch 处理。

停止执行任务

我们上面提交任务的示例中没有调用 shutdown 方法进行线程池的关闭,我们的任务执行完成之后,程序不会停止。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class ShutdownTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> f1 = pool.submit(()-> {
            log.debug("callable task");
            TimeUnit.SECONDS.sleep(2);
            return "callable task";
        });
        Future<String> f2 = pool.submit(()-> {
            log.debug("Runnable task,has result");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        },"Runnable task,has result" );

        // 在此处调用关闭线程池,f3 尚未提交,将不会被执行,且 submit 方法会抛出 RejectedExecutionException,程序会终止
//        pool.shutdown();
        Future<?> f3 = pool.submit(()-> {
            log.debug("Runnable task,has no result");
        });


        // 在此处调用关闭线程池,任务都已提交,将会执行完成
        pool.shutdown();

        // Future 的 get 方法是个阻塞方法,它会等待线程执行完成返回结果
        log.debug("f1 结果:{}",f1.get());
        log.debug("f2 结果:{}",f2.get());
        log.debug("f3 结果:{}",f3.get());

        // 在此处调用关闭线程池,会正常关闭
//        pool.shutdown();
    }


}

shutdown 关闭时,已经提交的任务会尽可能的执行完毕,但未提交的任务将抛出 RejectedExecutionException 且不执行。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class ShutdownTest1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> f1 = pool.submit(()-> {
            log.debug("callable task");
            TimeUnit.SECONDS.sleep(2);
            return "callable task";
        });
        Future<String> f2 = pool.submit(()-> {
            log.debug("Runnable task,has result");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        },"Runnable task,has result" );

        // 在此处调用关闭线程池,f3 尚未提交,将不会被执行,且 submit 方法会抛出 RejectedExecutionException,程序会终止
//        pool.shutdownNow();
        Future<?> f3 = pool.submit(()-> {
            log.debug("Runnable task,has no result");
        });


        // 在此处调用关闭线程池,任务都已提交,但 f3 还在等待执行,将抛出 InterruptedException,且不执行 f3,并且程序会终止
        pool.shutdownNow();

        // Future 的 get 方法是个阻塞方法,它会等待线程执行完成返回结果
        log.debug("f1 结果:{}",f1.get());
        log.debug("f2 结果:{}",f2.get());
        log.debug("f3 结果:{}",f3.get());

        // 由于 Future 的 get 方法的阻塞,当执行此关闭方法时,任务已经执行完毕
//        pool.shutdownNow();
    }
}

shutdownNow 方法调用时,会尽量执行完已提交且正在执行的任务,未提交的任务将抛出 RejectedExecutionException,已提交未执行的任务将抛出 InterruptedException

shutdown 和 shutdownNow 时,如果 submit 方法抛出响应的异常,如果没有处理相关异常,则主程序将终止,在实际使用中根据自身需要来处理这些异常情况

创建任务调度线程池

延迟执行任务

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ScheduledPollTest {

    public static void main(String[] args) {

        // 可执行任务调度的线程池
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        // 延迟 5 秒执行,任务为 Runnable 没有返回值
        ScheduledFuture<?> f1 = pool.schedule(() -> {
            log.debug("延迟 5 秒执行的任务");
        }, 5, TimeUnit.SECONDS);

        ScheduledFuture<?> f2 = pool.schedule(() -> {
            log.debug("延迟 5 秒执行的任务,带返回值");
            return 20;
        }, 5, TimeUnit.SECONDS);


        // 由于前两个任务延迟 5 秒执行,线程池线程数量为 2,将导致此线程将延迟 5+ 秒才会执行
        // + 的时间就是前两个线程执行的时间(如果你为前两个线程添加sleep时间,这个时间就更能清晰体现了)
        // 如果为前两个线程添加 sleep 时间,这里的 + 的时间为 sleep 短的时间 + 其执行时间
        ScheduledFuture<?> f3 = pool.schedule(() -> {
            log.debug("延迟 5 秒执行的任务 2 ,带返回值");
            return 50;
        }, 5, TimeUnit.SECONDS);


        // 线程池记得调用此方法,以关闭线程池
        pool.shutdown();

    }

}

结果:

16:15:28.352 [pool-1-thread-1] DEBUG com.yyoo.thread.executor.ScheduledPollTest - 延迟 5 秒执行的任务
16:15:28.352 [pool-1-thread-2] DEBUG com.yyoo.thread.executor.ScheduledPollTest - 延迟 5 秒执行的任务,带返回值
16:15:28.357 [pool-1-thread-1] DEBUG com.yyoo.thread.executor.ScheduledPollTest - 延迟 5 秒执行的任务 2 ,带返回值

从示例可以知道 schedule 方法的 delay 延迟时间,为任务提交后所等待的时间,当提交后等待时间到,又有空闲线程可以执行它,它就将直接执行。如果没有则继续等待,直到有空闲线程执行。

按执行周期重复执行任务

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class ScheduledPollTest1 {

    public static void main(String[] args) {

        // 可执行任务调度的线程池
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        // 延迟 5 秒开始执行,执行周期间隔为 2 秒,scheduleWithFixedDelay 只能执行 Runnable 任务
        pool.scheduleWithFixedDelay(() -> {
            log.debug("延迟 5 秒执行的任务,每隔 2 秒执行一次 No1.");
        }, 5,2, TimeUnit.SECONDS);


        pool.scheduleWithFixedDelay(() -> {
            log.debug("延迟 5 秒执行的任务,每隔 2 秒执行一次. No2.");
        }, 5,2, TimeUnit.SECONDS);

        // 由于前两个任务延迟 5 秒开始执行,线程池线程数量为 2,将导致此线程将延迟 5+ 秒才会开始执行,这个和延迟执行任务一样
        pool.scheduleWithFixedDelay(() -> {
            log.debug("延迟 5 秒执行的任务,每隔 2 秒执行一次. No3.");
        }, 5,2, TimeUnit.SECONDS);
    }

}

ThreadPoolExecutor 构造器

ThreadPoolExecutor 对象的主要构造器方法如下,其余构造器都是在此构造器基础上重载的


    /**
     * 根据具体的参数创建 {ThreadPoolExecutor} 对象
     * 
     *
     * @param corePoolSize 线程池中的主线程的数量,即便是空闲状态,主线程的数量也不会减少, 除非设置{allowCoreThreadTimeOut} 为 true
     * @param maximumPoolSize 当前线程池可创建的线程最大数量,maximumPoolSize - corePoolSize 为救急线程数量
     * @param keepAliveTime 当线程数超过 corePoolSize 数量时,如果出现空闲线程,则此空闲线程保持的时间为 keepAliveTime 如果到时间后没有任务可执行,则把该线程终止
     * @param unit keepAliveTime 参数的单位
     * @param workQueue 存储通过 execute 方法提交的且尚未执行的任务(submit 最终也会调用该方法)
     * @param threadFactory 线程工厂,用于创建线程	
     * @param handler 被阻止执行时,调用的处理程序
     * @throws IllegalArgumentException 触发以下条件是抛出异常
     *         corePoolSize < 0
     *         keepAliveTime < 0
     *         maximumPoolSize <= 0
     *         maximumPoolSize < corePoolSize
     * @throws NullPointerException 如果下列参数为空,抛出异常
     * 			workQueue
     *         	threadFactory
     * 			handler
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

结合构造器的说明,我们再来看看 Executors 创建 ThreadPoolExecutor 的各个方法

Executors.newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

主线程数和最大线程数都为 1,表示该线程池只有 1 个线程,空闲线程存活时间为 0,BlockingQueue 使用的是 LinkedBlockingQueue,线程工厂和拒绝策略都是默认策略

FinalizableDelegatedExecutorService 将 ThreadPoolExecutor 对象封装了起来,让外部无法改变该线程池的任何设置(因为该线程池为Single)

  • BlockingQueue:阻塞队列
  • ThreadFactory:创建线程的工厂,Executors 中有它的默认实现(Executors.DefaultThreadFactory)
  • RejectedExecutionHandler:拒绝策略处理类

ThreadFactory

ThreadFactory 用于实例化线程,包括设置线程的名称、优先级、是否是守护线程等

public interface ThreadFactory {

    /**
     * 新建线程
     *
     * @param r 要执行的任务对象
     * @return 创建成功的线程,如果线程创建被拒绝将返回null
     */
    Thread newThread(Runnable r);
}

Executors 中有 ThreadFactory 的默认实现

static class DefaultThreadFactory implements ThreadFactory {
		// 线程池的数量
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        // 线程分组
        private final ThreadGroup group;
        // 线程数
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        // 线程名称前缀
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

在 spring-context 包中也有对其的一个实现 org.springframework.scheduling.concurrent.CustomizableThreadFactory

示例1

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class PoolTest {

    public static void main(String[] args) {

        /*
        核心线程为:2
        最大线程数为:5
        救急线程最长空闲时间:10 秒
        阻塞队列使用固定容量的 LinkedBlockingQueue ,容量为 15
        线程工厂使用 Spring 提供的 CustomizableThreadFactory ,并定义线程的前缀为:my-thread-pool-
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(15),new CustomizableThreadFactory("my-thread-pool-"));

        // 提交任务之前,线程池刚刚初始化,此时线程池的线程数量为 0
        System.out.println("提交之前,线程池线程数:" + pool.getPoolSize());
        
        // 直接提交 20 个任务,我们的队列大小为 15,但此处并没有阻塞,因为一开始就有5个任务直接进入线程池执行了,剩下15个进入阻塞队列等待
        for(int i = 0; i < 20;i++){
            pool.submit(() -> {
                try {
                    log.debug(Thread.currentThread().getName());
                    TimeUnit.SECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 全部提交之后,由于任务数大于最大线程数,所以这里线程数为最大线程数5
        System.out.println("全部提交之后,线程池线程数:" + pool.getPoolSize());
        try {
            // 这里休眠100秒,因为我们20个任务每次执行5个,每次20秒,总共要花80秒
            // 再加上线程空闲后需要 10 秒才释放
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 执行到这里,救急线程已经被释放,所以线程数为 2 = 核心线程数
        System.out.println("最后,线程池线程数:" + pool.getPoolSize());

    }

}

这里涉及到阻塞队列的知识点,我们会在下篇文章详细介绍

RejectedExecutionHandler 拒绝策略

ThreadPoolExecutor 使用静态内部类的方式,定义了如下几个RejectedExecutionHandler 拒绝策略

  • AbortPolicy:默认拒绝策略,抛弃任务,并抛出 RejectedExecutionException 异常。如果submit时,没有try… catch 则会直接导致主线程异常结束。建议使用 try… catch,至少必要时我们还可以通过在 catch 块中打印被拒绝的任务,以提供日志排查,且不会导致主线程异常结束。
  • DiscardPolicy:直接抛弃任务,不做任何操作。
  • DiscardOldestPolicy:如果线程池没有关闭,直接抛弃队列中对头(已经等待时间最长)的任务,然后把当前任务重新加入的线程池等待执行(执行 execute 方法)
  • CallerRunsPolicy:如果线程池没有关闭,直接执行任务的run方法(直接执行run方法,相当于在主线程执行run不是多线程执行)。

示例

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class PoolTest {

    public static void main(String[] args) {

        /*
        核心线程为:2
        最大线程数为:5
        救急线程最长空闲时间:10 秒
        阻塞队列使用固定容量的 LinkedBlockingQueue ,容量为 15
        线程工厂使用 Spring 提供的 CustomizableThreadFactory ,并定义线程的前缀为:my-thread-pool-
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(15),new CustomizableThreadFactory("my-thread-pool-"));

        // 提交任务之前,线程池刚刚初始化,此时线程池的线程数量为 0
        System.out.println("提交之前,线程池线程数:" + pool.getPoolSize());

        // 我们此处添加了25个任务
        for(int i = 0; i < 25;i++){
            // 我们pool创建时,没有指定拒绝策略,则默认是 AbortPolicy,所以 i >= 20 时,添加的任务都将被拒绝策略拒绝
            // 如果此处不 try ... catch 在 i = 20 的时候 submit 方法就抛出了 RejectedExecutionException 异常
            // 这会导致主线程不再往下执行直接结束,后面的打印都不会指执行了
            try {
                pool.submit(() -> {
                    try {
                        log.debug(Thread.currentThread().getName());
                        TimeUnit.SECONDS.sleep(20);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }catch (RejectedExecutionException e){
                log.debug("任务{}被拒绝",i);
                e.printStackTrace();
            }
        }

        // 全部提交之后,由于任务数大于最大线程数,所以这里线程数为最大线程数5
        System.out.println("全部提交之后,线程池线程数:" + pool.getPoolSize());
        try {
            // 这里休眠100秒,因为我们20个任务每次执行5个,每次20秒,总共要花80秒
            // 再加上线程空闲后需要 10 秒才释放
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 执行到这里,救急线程已经被释放,所以线程数为 2 = 核心线程数
        System.out.println("最后,线程池线程数:" + pool.getPoolSize());

    }

}
文章来源:https://blog.csdn.net/forlinkext/article/details/134691526
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。