Java 执行同步任务时 将大的时间段段拆分成多个小的时间段

发布时间:2024年01月17日

同步任务,有时候数据量太大,一次执行有可能数据过多,撑爆内存,且内部接口调用也可能因为数据过多导致接口崩溃,分段执行可以很好地避免这些问题。

本文以按小时分段为例,将时间范围划分成多个时间段,对于每个时间段的数据再做分片,每100个数据一组。

需要JDK8、guava。

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

一、同步数据任务

package org.example.schedule;

import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * 同步数据服务
 */
public class SyncDataService {
    /**
     * 同步指定时间段内的数据。将时间段按小时划分,获取每小时内的数据,执行同步任务
     *
     * @param beginTime
     * @param endTime
     */
    public void syncData(LocalDateTime beginTime, LocalDateTime endTime) {
        // 每小时区间的开始时间
        LocalDateTime intervalBeginTime = beginTime;

        // 将时间范围按小时划分
        while (intervalBeginTime.isBefore(endTime)) {
            // 开始日志打印:
            log.info("开始同步本时段数据,intervalBeginTime={},beginTime={},endTime={}", intervalBeginTime, beginTime, endTime);

            LocalDateTime intervalPlusOneHour = intervalBeginTime.plusHours(1);
            // 每小时区间的结束之间
            LocalDateTime intervalEndTime = intervalPlusOneHour.isAfter(endTime) ? endTime : intervalPlusOneHour;

            // 同步该1小时范围内的数据
            syncIntervalTimeData(intervalBeginTime, intervalEndTime);

            // 结束日志打印
            log.info("完成本时段同步高清卡口数据,intervalBeginTime={}", intervalBeginTime);

            // 下一轮
            intervalBeginTime = intervalEndTime;
        }
    }

    /**
     * 同步时间段内的数据
     *
     * @param intervalBeginTime
     * @param intervalEndTime
     */
    private void syncIntervalTimeData(LocalDateTime intervalBeginTime, LocalDateTime intervalEndTime) {
        // 查询该时段数据
        List<XxxPO> dataList = xxxDao.query(intervalBeginTime, intervalEndTime);
        if (CollectionUtils.isEmpty(dataList)) {
            log.info("该时间段内没有数据,intervalBeginTime={},intervalEndTime={}", intervalBeginTime, intervalEndTime);
            return;
        }

        // 将待同步数据分组,每组100个
        List<List<XxxPO>> dataListParts = Lists.partition(dataList, 100);
        int partIndex = 0;
        for (List<XxxPO> dataSubList : dataListParts) {
            partIndex++;
            log.info("本时段开始同步分片序号:{},共{}个分片。intervalBeginTime={}", partIndex, dataListParts.size(), intervalBeginTime);
            Set<Integer> idSet = dataSubList.stream()
                .filter(po -> (StringUtils.isNotBlank(po.getId()) && StringUtils.isNumeric(po.getId())))
                .map(XxxPO::getId).map(Integer::valueOf).collect(Collectors.toSet());
            List<XxxVo> xxxVoList = xxxSevice.getDataInfoByIds(idSet);
            if (CollectionUtils.isEmpty(xxxVoList)) {
                log.info("本轮同步数据结束,未查询到待同步信息,intervalBeginTime={},intervalEndTime={}", intervalBeginTime,
                    intervalEndTime);
                continue;
            }
            // 转成map <id,信息>
            Map<Integer, XxxVo> dataInfoMap =
                xxxVoList.stream().collect(Collectors.toMap(XxxVo::getId, xxxVo -> xxxVo, (k1, k2) -> k2));

            List<SyncData> syncDataList = convert2SyncData(dataSubList, dataInfoMap);
            for (SyncData syncData : syncDataList) {
                xxxDao.insert(syncData);
            }
            log.info("本时段完成同步分片序号:{},共{}个分片。intervalBeginTime={}", partIndex, dataListParts.size(), intervalBeginTime);
        }
    }

    /**
     * 将数据转换成目标数据
     * 
     * @param dataSubList
     * @param dataInfoMap
     * @return
     */
    private List<SyncData> convert2SyncData(List<XxxPO> dataSubList, Map<Integer, XxxVo> dataInfoMap) {
        List<SyncData> syncDataList = new ArrayList<>();
        for (XxxPO xxxPO : dataSubList) {
            SyncData syncData = new SyncData();
            syncData.setId(xxxPO.getId());
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            syncData.setTime(sdf.format(xxxPO.getTime()));
            Integer id = (StringUtils.isNotBlank(xxxPO.getId()) && StringUtils.isNumeric(xxxPO.getId()))
                ? Integer.valueOf(xxxPO.getId()) : null;
            XxxVo xxxVo = dataInfoMap.getOrDefault(id, null);
            if (xxxVo != null) {
                syncData.setName(xxxVo.getName());
            }

            syncDataList.add(syncData);
        }

        return syncDataList;
    }
}

二、mybatis插入数据

当唯一键冲突时,更新数据

<insert id="insert">
    INSERT INTO `xx_table`(`id`, `time`, `name`)
    VALUES (#{id}, #{time}, #{name})
    ON DUPLICATE KEY UPDATE
        `time`=#{time},
        `name`=#{name},
        `name`=#{name}
</insert>
文章来源:https://blog.csdn.net/u010266988/article/details/135642598
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。