以同步全量MySQL数据到ElasticSearch为例。
业务逻辑:
public boolean syncToElasticsearch() {
log.info("Starting data synchronization to Elasticsearch.");
// 获取最大id和最小id
IdRange idRange = newsMapper.getIdRange();
log.info("idRange is:{}", JSON.toJSONString(idRange));
if (idRange == null || idRange.getMinId() <= 0 || idRange.getMaxId() <= 0) {
log.warn("Invalid id range or no data found in MySQL. Sync process aborted.");
return false;
}
long pageSize = 200L;
long startId = idRange.getMinId();
try {
// 循环处理所有数据
while (startId <= idRange.getMaxId()) {
// 业务逻辑 可以替换成自己需要的
log.info("syncToElasticsearch startId:{}", startId);
List<News> newsList = newsService.getByIdRange(startId, pageSize);
log.info("syncToElasticsearch newsList size:{}", newsList.size());
if (CollectionUtils.isEmpty(newsList)) {
break;
}
newsEsService.bulkUpsertToElasticsearch(getNewsEsDTOList(newsList));
// 更新startId
startId = newsList.get(newsList.size() - 1).getId() + 1;
log.info("Synced {} Newss to Elasticsearch, current id is:{}", newsList.size(), startId);
}
} catch (Exception e) {
log.error("Error occurred during News data synchronization to Elasticsearch.", e);
return false;
}
log.info("Data synchronization to Elasticsearch completed.");
return true;
}
mapper:
public interface NewsMapper extends BaseMapper<News> {
/**
* 获取最小和最大id值的范围
*
* @return
*/
@Select("SELECT MIN(id) AS minId, MAX(id) AS maxId FROM news")
IdRange getIdRange();
}
newsService:
public List<News> getByIdRange(long startId, long pageSize) {
if (startId <= 0 || pageSize <= 0) {
return Collections.emptyList();
}
return this.list(new LambdaQueryWrapper<News>()
.ge(News::getId, startId)
.orderByAsc(News::getId)
.last("limit " + pageSize));
}
实体类定义:
@Data
public class IdRange {
private Long minId;
private Long maxId;
}
页面深度越大查询性能越慢,当表有大量数据时处理后面的数据会很耗时。