线程池调度类:
package com.zhou.pool;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@Slf4j
public class DefaultThreadPool extends ScheduledThreadPoolExecutor {
@Getter
@Setter
private ThreadTrigger trigger = null;
public DefaultThreadPool(int corePoolSize, int maximumPoolSize) {
super(corePoolSize,new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
new CallerRunsPolicy());
setMaximumPoolSize(maximumPoolSize);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(trigger != null){
try{
trigger.beforeExecute(t,r);
}catch (Throwable e){
log.error("[beforeExecute]trigger failed.",e);
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if(trigger != null){
try{
trigger.afterExecute(r,t);
}catch (Throwable e){
log.error("[afterExecute]trigger failed.",e);
}
}
}
/**
* 关闭线程池前执行
*/
@Override
protected void terminated() {
if(trigger != null){
try{
trigger.beforeShutdown();
}catch (Throwable e){
log.error("[terminated]trigger failed.",e);
}
}
}
}
调度监听器:
package com.zhou.pool;
/**
* 全局监听器
*/
public interface ThreadTrigger {
/**
* 线程执行任务前执行
*/
void beforeExecute(Thread t, Runnable r);
/**
* 执行任务后执行
*/
void afterExecute(Runnable r, Throwable t);
/**
* 关闭线程池前执行
*/
void beforeShutdown();
}
package com.zhou.pool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* 线程池配置类
*/
@Slf4j
@Configuration
public class ThreadPoolConfig {
@Bean("scheduledExecutorService")
public DefaultThreadPool getPool(Environment environment){
Integer poolSize = NumberTool.safeToInteger(environment.getProperty("investment.poolSize"), 3);
Integer maxPoolSize = NumberTool.safeToInteger(environment.getProperty("investment.maxPoolSize"), 12);
if(maxPoolSize < 3){
maxPoolSize = 3;
}
if(maxPoolSize > 20){
maxPoolSize = 20;
}
if(poolSize > 12){
poolSize = 12;
}
if(poolSize > maxPoolSize){
poolSize = maxPoolSize;
}
if(poolSize < 1){
poolSize = 1;
}
log.info("线程池大小:poolSize={},maxPoolSize={}",poolSize,maxPoolSize);
DefaultThreadPool pool = new DefaultThreadPool(poolSize, maxPoolSize);
return pool;
}
}
调度任务提交类:
package com.zhou.pool;
//import com.zhou.framework.common.spring.SpringFactory;
import lombok.extern.slf4j.Slf4j;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author lang.zhou
*/
@Slf4j
public class ThreadPoolManager {
private static DefaultThreadPool pool = SpringFactory.getBean(DefaultThreadPool.class);
/**
* 延迟执行毫秒数
*/
private static final long delay = 10L;
/**
* 异步执行任务
* @param task 任务
*/
public static void execute(Runnable task) {
pool.schedule(task, delay, TimeUnit.MILLISECONDS);
}
/**
* 异步执行任务
* @param callable 任务
*/
public static <T> Future<T> asyncRun(Callable<T> callable) {
Future<T> scheduledFuture = pool.submit(callable);
return scheduledFuture;
}
}
调用:
ThreadPoolManager.execute(() -> {});