同步任务,有时候数据量太大,一次执行有可能数据过多,撑爆内存,且内部接口调用也可能因为数据过多导致接口崩溃,分段执行可以很好地避免这些问题。
本文以按小时分段为例,将时间范围划分成多个时间段,对于每个时间段的数据再做分片,每100个数据一组。
需要JDK8、guava。
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
package org.example.schedule;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 同步数据服务
*/
public class SyncDataService {
/**
* 同步指定时间段内的数据。将时间段按小时划分,获取每小时内的数据,执行同步任务
*
* @param beginTime
* @param endTime
*/
public void syncData(LocalDateTime beginTime, LocalDateTime endTime) {
// 每小时区间的开始时间
LocalDateTime intervalBeginTime = beginTime;
// 将时间范围按小时划分
while (intervalBeginTime.isBefore(endTime)) {
// 开始日志打印:
log.info("开始同步本时段数据,intervalBeginTime={},beginTime={},endTime={}", intervalBeginTime, beginTime, endTime);
LocalDateTime intervalPlusOneHour = intervalBeginTime.plusHours(1);
// 每小时区间的结束之间
LocalDateTime intervalEndTime = intervalPlusOneHour.isAfter(endTime) ? endTime : intervalPlusOneHour;
// 同步该1小时范围内的数据
syncIntervalTimeData(intervalBeginTime, intervalEndTime);
// 结束日志打印
log.info("完成本时段同步高清卡口数据,intervalBeginTime={}", intervalBeginTime);
// 下一轮
intervalBeginTime = intervalEndTime;
}
}
/**
* 同步时间段内的数据
*
* @param intervalBeginTime
* @param intervalEndTime
*/
private void syncIntervalTimeData(LocalDateTime intervalBeginTime, LocalDateTime intervalEndTime) {
// 查询该时段数据
List<XxxPO> dataList = xxxDao.query(intervalBeginTime, intervalEndTime);
if (CollectionUtils.isEmpty(dataList)) {
log.info("该时间段内没有数据,intervalBeginTime={},intervalEndTime={}", intervalBeginTime, intervalEndTime);
return;
}
// 将待同步数据分组,每组100个
List<List<XxxPO>> dataListParts = Lists.partition(dataList, 100);
int partIndex = 0;
for (List<XxxPO> dataSubList : dataListParts) {
partIndex++;
log.info("本时段开始同步分片序号:{},共{}个分片。intervalBeginTime={}", partIndex, dataListParts.size(), intervalBeginTime);
Set<Integer> idSet = dataSubList.stream()
.filter(po -> (StringUtils.isNotBlank(po.getId()) && StringUtils.isNumeric(po.getId())))
.map(XxxPO::getId).map(Integer::valueOf).collect(Collectors.toSet());
List<XxxVo> xxxVoList = xxxSevice.getDataInfoByIds(idSet);
if (CollectionUtils.isEmpty(xxxVoList)) {
log.info("本轮同步数据结束,未查询到待同步信息,intervalBeginTime={},intervalEndTime={}", intervalBeginTime,
intervalEndTime);
continue;
}
// 转成map <id,信息>
Map<Integer, XxxVo> dataInfoMap =
xxxVoList.stream().collect(Collectors.toMap(XxxVo::getId, xxxVo -> xxxVo, (k1, k2) -> k2));
List<SyncData> syncDataList = convert2SyncData(dataSubList, dataInfoMap);
for (SyncData syncData : syncDataList) {
xxxDao.insert(syncData);
}
log.info("本时段完成同步分片序号:{},共{}个分片。intervalBeginTime={}", partIndex, dataListParts.size(), intervalBeginTime);
}
}
/**
* 将数据转换成目标数据
*
* @param dataSubList
* @param dataInfoMap
* @return
*/
private List<SyncData> convert2SyncData(List<XxxPO> dataSubList, Map<Integer, XxxVo> dataInfoMap) {
List<SyncData> syncDataList = new ArrayList<>();
for (XxxPO xxxPO : dataSubList) {
SyncData syncData = new SyncData();
syncData.setId(xxxPO.getId());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
syncData.setTime(sdf.format(xxxPO.getTime()));
Integer id = (StringUtils.isNotBlank(xxxPO.getId()) && StringUtils.isNumeric(xxxPO.getId()))
? Integer.valueOf(xxxPO.getId()) : null;
XxxVo xxxVo = dataInfoMap.getOrDefault(id, null);
if (xxxVo != null) {
syncData.setName(xxxVo.getName());
}
syncDataList.add(syncData);
}
return syncDataList;
}
}
当唯一键冲突时,更新数据
<insert id="insert">
INSERT INTO `xx_table`(`id`, `time`, `name`)
VALUES (#{id}, #{time}, #{name})
ON DUPLICATE KEY UPDATE
`time`=#{time},
`name`=#{name},
`name`=#{name}
</insert>