本文主要研究一下PowerJob的ThreadPoolConfig
tech/powerjob/server/config/ThreadPoolConfig.java
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean(PJThreadPool.TIMING_POOL)
public TaskExecutor getTimingPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// use SynchronousQueue
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("PJ-TIMING-");
executor.setRejectedExecutionHandler(new NewThreadRunRejectedExecutionHandler(PJThreadPool.TIMING_POOL));
return executor;
}
@Bean(PJThreadPool.BACKGROUND_POOL)
public AsyncTaskExecutor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16);
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("PJ-BG-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL));
return executor;
}
@Bean(PJThreadPool.LOCAL_DB_POOL)
public TaskExecutor initOmsLocalDbPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int tSize = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
executor.setCorePoolSize(tSize);
executor.setMaxPoolSize(tSize);
executor.setQueueCapacity(2048);
executor.setThreadNamePrefix("PJ-LOCALDB-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL));
return executor;
}
/**
* 引入 WebSocket 支持后需要手动初始化调度线程池
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Math.max(Runtime.getRuntime().availableProcessors() * 8, 32));
scheduler.setThreadNamePrefix("PJ-DEFAULT-");
scheduler.setDaemon(true);
return scheduler;
}
}
ThreadPoolConfig定义了PowerJobTimingPool、PowerJobBackgroundPool、PowerJobLocalDbPool四个线程池,以及一个taskScheduler,这里使用的是spring的ThreadPoolTaskExecutor及ThreadPoolTaskScheduler,他们都继承了ExecutorConfigurationSupport
org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
//......
/**
* Calls {@code initialize()} after the container applied all property values.
* @see #initialize()
*/
@Override
public void afterPropertiesSet() {
initialize();
}
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isDebugEnabled()) {
logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
/**
* Perform a shutdown on the underlying ExecutorService.
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void shutdown() {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
}
ExecutorConfigurationSupport实现了InitializingBean、DisposableBean接口,在afterPropertiesSet方法执行initialize进行初始化,在destroy方法执行shutdown关闭线程池
ThreadPoolConfig定义了PowerJobTimingPool、PowerJobBackgroundPool、PowerJobLocalDbPool四个线程池,以及一个taskScheduler,这里使用的是spring的ThreadPoolTaskExecutor及ThreadPoolTaskScheduler,他们都继承了ExecutorConfigurationSupport;ExecutorConfigurationSupport实现了InitializingBean、DisposableBean接口,可以自动初始化及销毁线程池。