xxl-job中不建议使用@Async,因为@Transaction,@Async在同一个类中注解失效(本类中方法添加这两个注解,然后本类自己其它方法调用有这两注解的方法这两个注解是失效,只能外部类调用注解才会生效)
原因:spring 在扫描bean的时候会扫描方法上是否包含@Async注解,如果包含,spring会为这个bean动态地生成一个子类(即代理类,proxy),代理类是继承原来那个bean的。此时,当这个有注解的方法被调用的时候,实际上是由代理类来调用的,代理类在调用时增加异步作用。然而,如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个bean,所以就没有增加异步作用
有人提出将注解放到类上,但是我试了下放在类上本类调用还是失效的
在开启线程前获取日志路径,传入线程中设置上下文
thread.core-size=100
thread.max-size=200
thread.keep-alive-time=60
thread.queue-capacity=1000000
package com.xxl.job.executor.core.threads;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
*
* 配置加载
*
*/
@ConfigurationProperties(prefix = "thread")
@Component
@Data
public class ThreadConfig {
/**
* 核心线程数
*/
private Integer coreSize;
/**
* 最大线程数
*/
private Integer maxSize;
/**
* 空闲存活时间
*/
private Integer keepAliveTime;
/**
* 队列容量
*/
private Integer queueCapacity;
}
package com.xxl.job.executor.core.config;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.xxl.job.executor.core.threads.ThreadConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
/**
*
* 线程池初始化
*
*/
@Configuration
public class ThreadPoolConfig {
@Bean("taskPoolExecutor")
public Executor threadPoolTaskExecutor(ThreadConfig config) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置核心线程数
taskExecutor.setCorePoolSize(config.getCoreSize());
//设置最大线程数
taskExecutor.setMaxPoolSize(config.getMaxSize());
//设置队列最大容量
taskExecutor.setQueueCapacity(config.getQueueCapacity());
//设置线程空闲时间(秒)
taskExecutor.setKeepAliveSeconds(config.getKeepAliveTime());
//设置线程名称前缀
taskExecutor.setThreadNamePrefix("taskPoolExecutor--");
//调度器shutdown方法被调用时等待当前被调度的任务完成
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待时间(秒)
taskExecutor.setAwaitTerminationSeconds(60);
//拒绝策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//允许核心线程超时
taskExecutor.setAllowCoreThreadTimeOut(true);
//等待所有任务结束后关闭线程池
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
package com.xxl.job.executor.service.jobhandler.demo;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public abstract class AbstractJobHandler<T> extends IJobHandler
{
@Resource(name = "taskPoolExecutor")
private Executor threadPoolTaskExecutor;
public void execute() throws Exception
{
// 初始化执行数据
String param = "";
List<T> datas = this.initExecuteData(param);
List<CompletableFuture> result = new ArrayList<>();
// 线程同步工具
CountDownLatch countDownLatch = new CountDownLatch(datas.size());
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
String logFileName = xxlJobContext.getJobLogFileName();
for (T data : datas)
{
result.add(CompletableFuture.runAsync(new Runnable()
{
@Override
public void run()
{
try
{
// 调用定制执行方法
execute(data, param, logFileName);
Thread.sleep(100);
}
catch (Throwable e)
{
// 获取真正异常信息
if (e.getCause() != null)
{
e = e.getCause();
}
XxlJobHelper.log(e);
}
finally
{
// 计数器递减
countDownLatch.countDown();
}
}
}, threadPoolTaskExecutor));
}
CompletableFuture[] arr = new CompletableFuture[result.size()];
result.toArray(arr);
CompletableFuture.allOf(arr).get();
// 等待执行完成
await(countDownLatch);
}
/**
*
* 传递变量设置线程日志文件输出
*
* @param data
* @param param
* @param logfilename
*/
public void execute(T data, String param, String logfilename){
XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
String logFileName1 = xxlJobContext.getJobLogFileName();
if(!logFileName1.equals(logfilename)){
System.err.println("log日志不同:"+logfilename+"[logFileName1]"+logFileName1);
}else{
System.err.println("log日同:"+logfilename+"[logFileName1]"+logFileName1);
}
XxlJobContext xxlJobContext2 = new XxlJobContext(xxlJobContext.getJobId(), xxlJobContext.getJobParam(), logfilename, xxlJobContext.getShardIndex(), xxlJobContext.getShardTotal());
XxlJobContext.setXxlJobContext(xxlJobContext2);
execute(data, param);
}
/**
*
* 初始化执行数据
*
* @param param
* @return
*/
protected abstract List<T> initExecuteData(String param);
/**
*
* 定制执行方法
*
* @param data
* @param param
*/
protected abstract void execute(T data, String param);
/**
*
* 定制等待时长
*
* @param countDownLatch
* @throws InterruptedException
*/
protected void await(CountDownLatch countDownLatch) throws InterruptedException
{
// 默认等待一个小时
countDownLatch.await(1, TimeUnit.HOURS);
}
}
package com.xxl.job.executor.service.jobhandler.demo;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Account账号表实体类
*
* @date 2019/12/18
*/
@Data
public class Account implements Serializable
{
/**
* 序列化标识
*/
private static final long serialVersionUID = 1L;
/**
* 账号
*/
private String account;
/**
* 站点
*/
private String marketplaceId;
}
package com.xxl.job.executor.service.jobhandler.demo;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
*
* demo1
*
*/
@Slf4j
@Component
public class OrderJobHandler extends AbstractJobHandler<Account>
{
@XxlJob("orderJobHandler")
@Override
public void execute() throws Exception
{
super.execute();
}
@Override
protected List<Account> initExecuteData(String param)
{
// 账号列表
List<Account> accounts = null;
accounts = new ArrayList<>();
for (int i = 0; i <= 1000; i++) {
Account account1 = new Account();
account1.setAccount("任务1:" + i);
account1.setMarketplaceId(String.valueOf(i));
accounts.add(account1);
}
XxlJobHelper.log("Account同步订单 | 账号数量={}", accounts.size());
return accounts;
}
@Override
public void execute(Account account, String param)
{
XxlJobHelper.log("Amazon同步订单开始 | 账号={}, 站点={}", account.getAccount(), account.getMarketplaceId());
System.out.println(account.getAccount());
System.out.println("业务doing");
log.info("执行test方法的线程的名字为:" + Thread.currentThread().getName());
XxlJobHelper.log("Acount同步订单完成 | 账号={}, 站点={}", account.getAccount(), account.getMarketplaceId());
}
}
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
<!-- slf4j日志 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>