MySQL按序批量操作大量数据

发布时间:2023年12月20日

MySQL按序批量操作大量数据(Java、springboot、mybatisplus、ElasticSearch)

以同步全量MySQL数据到ElasticSearch为例。

核心代码

业务逻辑:

 public boolean syncToElasticsearch() {
        log.info("Starting data synchronization to Elasticsearch.");
        // 获取最大id和最小id
        IdRange idRange = newsMapper.getIdRange();
        log.info("idRange is:{}", JSON.toJSONString(idRange));
        if (idRange == null || idRange.getMinId() <= 0 || idRange.getMaxId() <= 0) {
            log.warn("Invalid id range or no data found in MySQL. Sync process aborted.");
            return false;
        }
        long pageSize = 200L;
        long startId = idRange.getMinId();
        try {
        	// 循环处理所有数据
            while (startId <= idRange.getMaxId()) {
            	// 业务逻辑 可以替换成自己需要的
                log.info("syncToElasticsearch startId:{}", startId);
                List<News> newsList = newsService.getByIdRange(startId, pageSize);
                log.info("syncToElasticsearch newsList size:{}", newsList.size());
                if (CollectionUtils.isEmpty(newsList)) {
                    break;
                }
                newsEsService.bulkUpsertToElasticsearch(getNewsEsDTOList(newsList));

                // 更新startId
                startId = newsList.get(newsList.size() - 1).getId() + 1;
                log.info("Synced {} Newss to Elasticsearch, current id is:{}", newsList.size(), startId);
            }
        } catch (Exception e) {
            log.error("Error occurred during News data synchronization to Elasticsearch.", e);
            return false;
        }
        log.info("Data synchronization to Elasticsearch completed.");
        return true;
    }

mapper:

public interface NewsMapper extends BaseMapper<News> {

    /**
     * 获取最小和最大id值的范围
     *
     * @return
     */
    @Select("SELECT MIN(id) AS minId, MAX(id) AS maxId FROM news")
    IdRange getIdRange();
}

newsService:

    public List<News> getByIdRange(long startId, long pageSize) {
        if (startId <= 0 || pageSize <= 0) {
            return Collections.emptyList();
        }
        return this.list(new LambdaQueryWrapper<News>()
                .ge(News::getId, startId)
                .orderByAsc(News::getId)
                .last("limit " + pageSize));
    }

实体类定义:

@Data
public class IdRange {
    private Long minId;
    private Long maxId;
}

为什么不直接用分页?

页面深度越大查询性能越慢,当表有大量数据时处理后面的数据会很耗时。

文章来源:https://blog.csdn.net/Ning_chuan/article/details/135018687
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。