hudi提供了很多项数据服务来管理表中的数据,其中有一项服务称之为Cleaner(数据清理服务)。随着用户向表中写入的数据越多,对于每一次的更新,hudi都会产生一个版本的数据文件保存更新后的记录(COPY_ON_WRITE)或者是将这些增量更新的数据文件写入日志文件以避免重写更新版本的数据文件(MERGE-ON_READ)。在这个情况下,随着更新频率的增加,数据版本文件无限增长。但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。以下是与 Cleaning 有关的几项重要配置,更详细的配置参数见:
https://hudi.apache.org/docs/configurations/#Clean-Configs
在后面的介绍中我们会逐一介绍它们的作用:
配置项 | 参数 | 说明 |
hoodie.clean.automatic | true | 是否开启自动清理 |
hoodie.clean.async | true | 是否异步清理 |
hoodie.cleaner.policy | KEEP_LATEST_COMMITS | 清理保留策略,可选项:KEEP_LATEST_COMMITS,KEEP_LATEST_FILE_VERSIONS,KEEP_LATEST_BY_HOURS |
hoodie.cleaner.commits.retained | 10 | 在最新时间线之前,保留的时间线个数。 |
hoodie.cleaner.parallelism | 200 | 执行清理的并发数 |
这是hudi默认的策略,该清理策略可确保回溯前X次提交中发生的所有更改。假设每 30 分钟将数据摄取到 Hudi 数据集,并且最长的运行查询可能需要 5 小时才能完成,那么用户应该至少保留最后 10 次提交。通过这样的配置,我们确保文件的最旧版本在磁盘上保留至少 5 小时,从而防止运行时间最长的查询在任何时间点失败,使用此策略也可以进行增量清理。
策略具有保持 N 个文件版本而不受时间限制的效果。当知道在任何给定时间想要保留多少个 MAX 版本的文件时,此策略很有用,为了实现与以前相同的防止长时间运行的查询失败的行为,应该根据数据模式进行计算,或者如果用户只想维护文件的 1 个最新版本,此策略也很有用。
根据小时数清理,默认保留最近24小时的文件。
测试用的数据表有如下几项关键配置
配置项 | 设定值 |
hoodie.clean.automatic | true |
hoodie.cleaner.commits.retained | 1 |
建表语句:
create table small_file_hudi_cow (
id int,
name string,
age int,
city STRING,
date_str STRING
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'id',
'hoodie.clean.automatic' = 'true',
'hoodie.cleaner.commits.retained' = '1'
)
partitioned by (date_str);
执行计划
步骤 | 操作 | 文件系统 | 导入或更新数据命令 |
1 | Inset | +1 base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-03'; |
2 | upsert | +1 base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-03'; |
3 | upsert? | +1 base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-03'; |
第一步insert:
Hudi 将数据写入到一个parquet文件,随后在文件系统产生数据:
?第二步updert
全量更新数据,产生一个新的版本文件。
第三步update.
由于全量更新第一次的所有数据文件,更新后添加对应的一组 file 信息。同时由于设置了同步的清理,设置的保留版本数为1(当前版本之前保留 1 个版本),因此在执行完本次更新后,会添加一个清理的版本信息,进行历史版本时间线数据的清理。其文件信息及时间线如下