xxl-job使用线程池多线程日志打印问题

发布时间:2024年01月24日

xxl-job中不建议使用@Async,因为@Transaction,@Async在同一个类中注解失效(本类中方法添加这两个注解,然后本类自己其它方法调用有这两注解的方法这两个注解是失效,只能外部类调用注解才会生效)

原因:spring 在扫描bean的时候会扫描方法上是否包含@Async注解,如果包含,spring会为这个bean动态地生成一个子类(即代理类,proxy),代理类是继承原来那个bean的。此时,当这个有注解的方法被调用的时候,实际上是由代理类来调用的,代理类在调用时增加异步作用。然而,如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个bean,所以就没有增加异步作用

有人提出将注解放到类上,但是我试了下放在类上本类调用还是失效的

在开启线程前获取日志路径,传入线程中设置上下文

thread.core-size=100
thread.max-size=200
thread.keep-alive-time=60
thread.queue-capacity=1000000
package com.xxl.job.executor.core.threads;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 *
 * 配置加载
 *
 */
@ConfigurationProperties(prefix = "thread")
@Component
@Data
public class ThreadConfig {
    /**
     * 核心线程数
     */
    private Integer coreSize;

    /**
     * 最大线程数
     */
    private Integer maxSize;

    /**
     * 空闲存活时间
     */
    private Integer keepAliveTime;

    /**
     * 队列容量
     */
    private Integer queueCapacity;
}

package com.xxl.job.executor.core.config;

import com.alibaba.ttl.threadpool.TtlExecutors;
import com.xxl.job.executor.core.threads.ThreadConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 *
 * 线程池初始化
 *
 */
@Configuration
public class ThreadPoolConfig {

    @Bean("taskPoolExecutor")
    public Executor threadPoolTaskExecutor(ThreadConfig config) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置核心线程数
        taskExecutor.setCorePoolSize(config.getCoreSize());
        //设置最大线程数
        taskExecutor.setMaxPoolSize(config.getMaxSize());
        //设置队列最大容量
        taskExecutor.setQueueCapacity(config.getQueueCapacity());
        //设置线程空闲时间(秒)
        taskExecutor.setKeepAliveSeconds(config.getKeepAliveTime());
        //设置线程名称前缀
        taskExecutor.setThreadNamePrefix("taskPoolExecutor--");
        //调度器shutdown方法被调用时等待当前被调度的任务完成
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //等待时间(秒)
        taskExecutor.setAwaitTerminationSeconds(60);
        //拒绝策略
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //允许核心线程超时
        taskExecutor.setAllowCoreThreadTimeOut(true);
        //等待所有任务结束后关闭线程池
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }


}

package com.xxl.job.executor.service.jobhandler.demo;


import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;


public abstract class AbstractJobHandler<T> extends IJobHandler
{
    @Resource(name = "taskPoolExecutor")
    private Executor threadPoolTaskExecutor;

    public void execute() throws Exception
    {
        // 初始化执行数据
        String param = "";
        List<T> datas = this.initExecuteData(param);

        List<CompletableFuture> result = new ArrayList<>();
        // 线程同步工具
        CountDownLatch countDownLatch = new CountDownLatch(datas.size());

        XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
        String logFileName = xxlJobContext.getJobLogFileName();

        for (T data : datas)
        {
            result.add(CompletableFuture.runAsync(new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        // 调用定制执行方法
                        execute(data, param, logFileName);
                        Thread.sleep(100);
                    }
                    catch (Throwable e)
                    {
                        // 获取真正异常信息
                        if (e.getCause() != null)
                        {
                            e = e.getCause();
                        }
                        XxlJobHelper.log(e);
                    }
                    finally
                    {
                        // 计数器递减
                        countDownLatch.countDown();
                    }
                }
            }, threadPoolTaskExecutor));
        }

        CompletableFuture[] arr = new CompletableFuture[result.size()];
        result.toArray(arr);
        CompletableFuture.allOf(arr).get();

        // 等待执行完成
        await(countDownLatch);
    }

    /**
     *
     * 传递变量设置线程日志文件输出
     *
     * @param data
     * @param param
     * @param logfilename
     */
    public void execute(T data, String param, String logfilename){
        XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();
        String logFileName1 = xxlJobContext.getJobLogFileName();
        if(!logFileName1.equals(logfilename)){
            System.err.println("log日志不同:"+logfilename+"[logFileName1]"+logFileName1);
        }else{
            System.err.println("log日同:"+logfilename+"[logFileName1]"+logFileName1);
        }
        XxlJobContext xxlJobContext2 = new XxlJobContext(xxlJobContext.getJobId(), xxlJobContext.getJobParam(), logfilename, xxlJobContext.getShardIndex(), xxlJobContext.getShardTotal());
        XxlJobContext.setXxlJobContext(xxlJobContext2);
        execute(data, param);
    }

    /**
     * 
     * 初始化执行数据
     * 
     * @param param
     * @return
     */
    protected abstract List<T> initExecuteData(String param);

    /**
     *
     * 定制执行方法
     *
     * @param data
     * @param param
     */
    protected abstract void execute(T data, String param);

    /**
     * 
     * 定制等待时长
     * 
     * @param countDownLatch
     * @throws InterruptedException
     */
    protected void await(CountDownLatch countDownLatch) throws InterruptedException
    {
        // 默认等待一个小时
        countDownLatch.await(1, TimeUnit.HOURS);
    }

}

package com.xxl.job.executor.service.jobhandler.demo;

import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * Account账号表实体类
 *
 * @date 2019/12/18
 */
@Data
public class Account implements Serializable
{
    /**
     * 序列化标识
     */
    private static final long serialVersionUID = 1L;

    /**
     * 账号
     */
    private String account;

    /**
     * 站点
     */
    private String marketplaceId;

}

package com.xxl.job.executor.service.jobhandler.demo;


import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 *
 * demo1
 *
 */
@Slf4j
@Component
public class OrderJobHandler extends AbstractJobHandler<Account>
{
    @XxlJob("orderJobHandler")
    @Override
    public void execute() throws Exception
    {
        super.execute();
    }

    @Override
    protected List<Account> initExecuteData(String param)
    {
        // 账号列表
        List<Account> accounts = null;
        accounts = new ArrayList<>();

        for (int i = 0; i <= 1000; i++) {
            Account account1 = new Account();
            account1.setAccount("任务1:" + i);
            account1.setMarketplaceId(String.valueOf(i));
            accounts.add(account1);
        }
        XxlJobHelper.log("Account同步订单 | 账号数量={}", accounts.size());
        return accounts;
    }


    @Override
    public void execute(Account account, String param)
    {
        XxlJobHelper.log("Amazon同步订单开始 | 账号={}, 站点={}", account.getAccount(), account.getMarketplaceId());
        System.out.println(account.getAccount());
        System.out.println("业务doing");

        log.info("执行test方法的线程的名字为:" + Thread.currentThread().getName());

        XxlJobHelper.log("Acount同步订单完成 | 账号={}, 站点={}", account.getAccount(), account.getMarketplaceId());
    }
}

        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- slf4j日志 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

文章来源:https://blog.csdn.net/qq_30920479/article/details/135821840
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。