线程池是一种管理和复用线程的机制,它包含一组预先创建的线程,用于执行各种任务。线程池的主要作用是提高多线程应用程序的性能和效率,并提供对线程的生命周期和资源的管理,包括线程的创建、销毁和复用。
本文主要讨论线程池执行器(ThreadPoolExecutor)的用法,在观看本文之前建议先看线程池使用入门。
ThreadPoolExecutor 的作用是创建线程池。一般情况下,我们使用 Executors 来创建线程池,使用起来非常简单方便。但是他有一个问题,就是创建池时有很参数需要调整时他就不灵活了。Executors 创建线程池本质上也是使用 ThreadPoolExecutor ,所以我们需要了解 ThreadPoolExecutor ,因为他提供了更多和线程池管理控制功能。
下面是使用 ThreadPoolExecutor 创建线程池的示例:
ThreadPoolExecutor s = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutor 有7个构造函数,每个参数的用途如下:
通过实现 ThreadFactory 接口,实现自定义线池创建工厂。
如下,自定义线程创建工厂,在创建线程时设置线程名称。
private static void test2() {
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName( "线程-" + thread.getId()); // 设置一个线程名称,方便异常追踪
// 可以在这里设置其他线程属性,例如优先级、是否为守护线程等
return thread;
}
};
ThreadPoolExecutor s = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
}
通过实现 RejectedExecutionHandler 接口,实现自定义线程池拒绝策略。
如下,我通过自定义线程池拒绝策略,让线程池添加任务数量大于100时,输出一个日志。
高并发实际应用中你可以记录一个日志文件或数据库,当线程池任务数量小于规定值时重新把任务加入线程池,进行线程池任务恢复,从而防止内存泄露、资源不足等异常产生,保证程序健壮性和稳定性。
private static void test1() throws ExecutionException, InterruptedException {
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(executor.getQueue().size() > 100){
System.out.println("线程池任务数量大于100了,要不要限制什么的");
}
}
};
ThreadPoolExecutor s = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), Executors.defaultThreadFactory(),
rejectedExecutionHandler);
}
在下面示例中,我创建一个线程池。核心线程数为2个,最大线程数为4个(最多只有4个线程在执行任务),线程空闲存活时间为60秒,使用 ArrayBlockingQueue 保存 Runnable 类型任务,使用默认线程创建工厂和调用者运行拒绝策略。然后添加一个100个任务,每个任务耗时5秒。线程池会执行这些任务,至到结束。
private static void test3() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 100 ; i++) {
final int id = i;
executor.execute(new Runnable() {
@Override
public void run() {
someTask(id);
}
});
}
}
private static void someTask(int id){
// 这里是一个复杂的业务逻辑(耗时任务)
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("这里是一个复杂的业务逻辑(耗时任务)"+id);
}