Hudi Clustering对于在数据写入和读取提供一套相对完善的解决方案。它的核心思想就是: 在数据写入时,运行并发写入多个小文件,从而提升写入的性能;同时通过一个异步(也可以配置同步,但不推荐)进程或者周期性调度来执行小文件合并成大文件在这个过程中hudi还考虑到对数据按照特定的列进行重排序,这样在解决小文件问题的同时还优化了查询性能,可谓是“两全其美”。对于Clustering的手法其实是一种比较通用的优化数据重新布局的手段。其中在Hive/Spark SQL中都有类似的操作cluster by,只是在hudi中更加追求完美,多了一项合并小文件工作。关于cluster的几个配置参数:
配置项 | 默认值 | 说明 |
---|---|---|
hoodie.clustering.inline | false | |
hoodie.clustering.schedule.inline | false | |
hoodie.clustering.async.enabled | false | |
hoodie.clustering.inline.max.commits | 4 | |
hoodie.clustering.async.max.commits | 4 | |
hoodie.clustering.plan.strategy.small.file.limit | 314572800 ( 300MB ) | 只有小于该值的文件才会被视为小文件,从而参与到 Clustering 中。 |
hoodie.clustering.plan.strategy.target.file.max.bytes | 1073741824 ( 1GB ) | 限制 Clustering 生成的文件大小,默认是 1GB。合并后的最大文件不会超过该值。 |
hoodie.clustering.plan.strategy.sort.columns | -- | 针对哪个列重新进行排序。对于该字段过滤条件的查询有很大性能提高。 |
Clustering 的执行机制和compaction的机制类似,都是分为Schedule和execute两个阶段。计划的阶段主要是规划哪些文件参与Clusetring,然后生成一个计划Clusetring Plan保存到Timeline,Timeline中的Instant会有一个replacecommit的值,状态是REQUESTED ;执行阶段主要工作是读取Timeline中的计划,执行完毕,最后将replace commit改为COMPLETED状态。
和compaction一样。Clustering运行模式分为:同步、异步、半异步(为本文的一种叫法,在hudi官网没有体现。)他们之前的差异主要体现在从提交到计划到执行的的三个阶段的推进上。
同步模式可概括为:立即计划,立即执行(Inline Schedule,Inline Execute)。在该模式下,当累积的提交(Commit)次数到达一个阈值时,会立即触发 Clustering 的计划与执行(计划和执行是连在一起的),而这个阈值是由配置项 hoodie.clustering.inline.max.commits 控制的,默认值是 4,即:默认情况下,每提交 4 次就(有可能)会触发并执行一次 Clustering。锁定同步模式的配置是:
配置项 | 参数 |
---|---|
hoodie.clustering.inline | true |
hoodie.clustering.schedule.inline | false |
hoodie.clustering.async.enabled | False |
异步模式可概括为:另行计划,另行执行(Offline Schedule,Offline Execute)。在该模式下,任何提交都不会直接触发和执行 Clustering,除非使用支持异步 Clustering 的 Writer,否则用户需要自己保证有一个独立的进程或线程负责定期执行 Clustering 操作。在异步模式下,由于发起计划和提交之间没有必然的协同关系,所以在发起计划时,Timeline 中可能尚未积累到足够数量的提交,或者提交数量已经超过了规定阈值,如果是前者,不会产生计划计划,如果是后者,计划计划会将所有累积的提交涵盖进来,在这一点上,Clustering 和 Compaction 的处理方式是一致的。锁定异步模式的配置是:
配置项 | 设定值 |
---|---|
hoodie.clustering.inline | false |
hoodie.clustering.schedule.inline | false |
hoodie.clustering.async.enabled | true |
半异步模式可概括为:立即计划,另行执行(Inline Schedule,Offline Execute),即:计划会伴随提交自动触发,但执行还是通过前面介绍的三种异步方式之一去完成。简单总结一下半异步的设计思想:它在每次提交时都会尝试生成计划,如果此前已经生成了计划且尚未执行,则放弃计划,等待其被执行,当异步进程或线程完成执行作业时,紧接着的下一次提交会立即生成新的计划,这样,整个 Clustering 的“节奏”就由异步的执行程序来掌控了。锁定半异步模式的配置是:
配置项 | 设定值 |
---|---|
hoodie.clustering.inline | False |
hoodie.clustering.schedule.inline | true |
hoodie.clustering.async.enabled | false |
Clustering 在排期和执行上都有可插拔的策略,以及在执行期间如何应对数据更新也有相应的更新策略,执行策略和更新策略较为简单,使用默认配置即可,本文不再赘述,详情可参考官方文档。本文着重介绍一下排期策略。Hudi 有三种 Clustering 排期策略可供选择:
SparkSizeBasedClusteringPlanStrategy:该策略为默认的排期策略,它会筛选出符合条件的小文件(就是看文件大小,小于 clustering.plan.strategy.small.file.limit 规定值的文件就是小文件),然后将选出的小文件分成多个 Group,Group 的数量和大小都是可配置的,划分 Group 的目的是提升 Clustering 的并行度。注意:该策略将会扫描全部分区。
SparkRecentDaysClusteringPlanStrategy:该策略会在此前 N 天的分区内查找小文件,对于使用日期作分区,且数据增量是可预期的数据表来说,这种策略是非常适合的。如果在这种情况下使用默认排期策略,就会扫描全部分区,给系统带来没有必要的负载。
SparkSelectedPartitionsClusteringPlanStrategy:该策略允许我们针对特定的分区进行 Clustering,这可能会应用在运维或某些具有独特业务特征的数据表上。
hoodie.clustering.plan.strategy.sort.columns 用于指定在 Clustering 过程中针对哪个列重新进行排序,这也是前文重点解释的 Clustering 能提升数据读取性能的关键。该列的选择对提升查询效率非常重要,通常会选择查询频率最高的条件列。尽管该配置项支持多列,但如果配置了两个或更多列的话,对于那些排在第一列后面的列来说,以它们为条件的查询并不能从中获得太多收益,这和在 HBase 中拼接列值到 Rowkey 中以提升检索性能是一样的。不过,Hudi 提供了以 z-order 和 hilbert 为代表的空间填充曲线技术用于解决多列排序问题。
关闭parquet小文件检查;将hoodie.parquet.small.file.limit置为0。这样做hudi将会把所有的文件认为是大文件。任何数据在写入的时候都不在发生copy-on-write的copy的操作。而是直接写入新的文件,这样减少了写入操作的负担。所以产生的小文件就是Clustering就要去解决的事情。
配置项 | 默认值 | 设定值 |
---|---|---|
hoodie.clustering.inline | false | True |
hoodie.clustering.schedule.inline | false | false |
hoodie.clustering.async.enabled | False | False |
hoodie.clustering.async.enabled | 4 | 2 |
hoodie.clustering.async.enabled | 314572800 ( 300MB ) | 314572800 ( 300MB ) |
hoodie.clustering.async.enabled | 1073741824 ( 1GB ) | 1073741824 ( 1GB ) |
hoodie.parquet.small.file.limit | 104857600 ( 100MB ) | 0 |
create table small_file_hudi_cow (
id int,
name string,
age int,
city STRING,
date_str STRING
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'id',
'hoodie.clustering.inline' = true,
'hoodie.clustering.schedule.inline' = false,
'hoodie.clustering.async.enabled' = false,
'hoodie.clustering.inline.max.commits' = 2,
'hoodie.clustering.plan.strategy.small.file.limit' = 314572800,
'hoodie.clustering.plan.strategy.target.file.max.bytes' = 1073741824,
'hoodie.parquet.small.file.limit' = '0'
)
partitioned by (date_str);
步骤 | 操作 | 文件系统 | 导入或者更新操作 |
---|---|---|---|
1 | insert | base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-02'; |
2 | update | base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-02'; |
3 | update | clustering +base file | INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-02'; |
第一步:insert
Hudi 将其写入到一个 Parquet 文件中,第一组 File Group 随之产生。其文件信息及时间线如下:
第二步:
由于全量更新第一次的所有数据文件,更新后添加对应的一组 file 信息。其文件信息及时间线如下:
第三步:
在做一次全量的数据更新,同时设置了clustering 模式的最大提交次数为2,所以此次提交触发clustering 机制。自动发起了名为 replacecommit 提交,然后预计合并后的数据进行更新的commit 信息。其文件信息及时间线如下: