ThreadPoolExecutor
是 Java 中用于创建和管理线程池的接口,当线程池中的任务队列已满,并且线程池中的线程数量已经达到最大时,如果再有新的任务提交,就需要一个策略来处理这些无法执行的任务。它 提供了四种拒绝策略,都是 RejectedExecutionHandler
接口的实现,如下:
RejectedExecutionException
异常。execute
方法的线程来运行任务,如果执行程序已经关闭,那么任务将被抛弃。以下是使用ThreadPoolExecutor的代码示例,如下:
import java.util.concurrent.*;
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
// 创建一个固定大小的线程池
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
// 使用不同的拒绝策略
RejectedExecutionHandler abortPolicy = new ThreadPoolExecutor.AbortPolicy();
RejectedExecutionHandler callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
RejectedExecutionHandler discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
RejectedExecutionHandler discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
// 创建线程池并设置拒绝策略
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, abortPolicy);
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, callerRunsPolicy);
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, discardPolicy);
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, discardOldestPolicy);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor1.execute(() -> System.out.println("Executing task " + taskId + " with AbortPolicy"));
executor2.execute(() -> System.out.println("Executing task " + taskId + " with CallerRunsPolicy"));
executor3.execute(() -> System.out.println("Executing task " + taskId + " with DiscardPolicy"));
executor4.execute(() -> System.out.println("Executing task " + taskId + " with DiscardOldestPolicy"));
}
}
}
在上面的代码示例中,由于工作队列和线程池的大小都设置得很小,所以提交的任务会很快填满队列和线程池,从而触发拒绝策略,不同的拒绝策略适用于不同的场景,在实际应用中,应该根据具体需求来合理设置线程池的大小、工作队列的容量以及拒绝策略。
ThreadFactory
接口常用于在ThreadPoolExecutor
中管理控制线程的名称、优先级、是否守护线程以及其他线程属性。下面是一个简单的ThreadFactory
使用案例,案例中定义了一个CustomThreadFactory类,如下代码:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public CustomThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
// 创建一个新线程
Thread thread = new Thread(r);
// 设置线程名称,使用前缀和递增的数字
thread.setName(prefix + "-" + threadNumber.getAndIncrement());
// 可以设置其他属性,比如优先级、守护状态等
// thread.setPriority(Thread.MAX_PRIORITY);
// thread.setDaemon(false);
return thread;
}
}
在上面代码中:
CustomThreadFactory
类实现了ThreadFactory
接口,并重写了newThread
方法。prefix
参数,用于生成线程名称的前缀。AtomicInteger
来生成唯一的线程编号,确保在多线程环境下编号不会重复。newThread
方法中,创建一个新的Thread
对象,并通过setName
方法设置线程的名称,这里使用前缀和递增的编号来构建线程名称,方便在日志或调试时识别线程。Thread
方法来设置线程的优先级、守护状态等属性。如下是CustomThreadFactory的使用方法,如下:
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个具有自定义线程工厂的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit for keepAliveTime
new SynchronousQueue<>(), // workQueue
new CustomThreadFactory("CustomThreadPool-") // 自定义线程工厂
);
// 提交任务给线程池
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
// 执行任务
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
});
}
// 关闭线程池(这通常是在应用程序关闭时完成的)
// executor.shutdown();
}
}
END!