Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合,推出新一代的 Streaming Lakehouse 技术,促进数据在数据湖上真正实时流动起来,并为用户提供实时离线一体化的开发体验。Flink 社区内部孵化了 Flink Table Store (简称 FTS) 子项目,一个真正面向 Streaming 以及 Realtime的数据湖存储项目。2023年3月12日,FTS进入 Apache 软件基金会 (ASF) 的孵化器,改名为 Apache Paimon (incubating)。
Apache Paimon是一个流数据湖平台,具有高速数据摄取、变更日志跟踪和高效的实时分析的能力。
1)统一批处理和流处理
批量写入和读取、流式更新、变更日志生成,全部支持。
2)数据湖能力
低成本、高可靠性、可扩展的元数据。 Apache Paimon 具有作为数据湖存储的所有优势。
3)各种合并引擎
按照您喜欢的方式更新记录。保留最后一条记录、进行部分更新或将记录聚合在一起,由您决定。
4)变更日志生成
Apache Paimon 可以从任何数据源生成正确且完整的变更日志,从而简化您的流分析。
5)丰富的表类型
除了主键表之外,Apache Paimon还支持append-only表,提供有序的流式读取来替代消息队列。
6)模式演化
Apache Paimon 支持完整的模式演化。您可以重命名列并重新排序。
Paimon 采用 LSM 树(日志结构合并树)作为文件存储的数据结构。
1) Sorted Runs
LSM 树将文件组织成多个Sorted Run。Sorted Run由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Run。
不同的Sorted Run可能具有重叠的主键范围,甚至可能包含相同的主键。查询LSM树时,必须合并所有Sorted Run,并且必须根据用户指定的合并引擎和每条记录的时间戳来合并具有相同主键的所有记录。
写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时,内存中的所有记录将被排序并刷新到磁盘。
2) Compaction
当越来越多的记录写入LSM树时,Sorted Run的数量将会增加。由于查询LSM树需要将所有Sorted Run合并起来,太多Sorted Run将导致查询性能较差,甚至内存不足。
为了限制Sorted Run的数量,我们必须偶尔将多个Sorted Run合并为一个大的Sorted Run。这个过程称为Compaction。
然而,Compaction是一个资源密集型过程,会消耗一定的CPU时间和磁盘IO,因此过于频繁的Compaction可能会导致写入速度变慢。这是查询和写入性能之间的权衡。 Paimon 目前采用了类似于 Rocksdb 通用压缩的Compaction策略。
默认情况下,当Paimon将记录追加到LSM树时,它也会根据需要执行Compaction。用户还可以选择在“专用Compaction作业”中独立执行所有Compaction。
官网地址:paimon-flink下载
1)修改flink-conf.yaml配置
#解决中文乱码,1.17之前参数是env.java.opts
env.java.opts.all: -Dfile.encoding=UTF-8
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 3
execution.checkpointing.interval: 10s
state.checkpoints.dir: hdfs://hadoop:9000/ckps
2)启动 Flink集群
(1)解决依赖问题
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/
(2)这里以 Yarn-Session模式为例
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
3)启动Flink的sql-client
/opt/module/flink-1.17.0/bin/sql-client.sh -s yarn-session
Paimon Catalog可以持久化元数据,当前支持两种类型的metastore:
通常都是使用hive类型。
1)上传 hive-connector
将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上川到Flink的lib目录下
2)重启yarn-session集群
3)启动hive的metastore服务
nohup hive --service metastore &
4)创建Hive Catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop:9083',
'hive-conf-dir' = '/opt/module/hive/conf',
'warehouse' = 'hdfs://hadoop:9000/paimon/hive'
);
USE CATALOG hive_catalog;
5)注意事项
使用hive Catalog
通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置
vim /opt/module/hive/conf/hive-site.xml;
<property>
<name>hive.metastore.disallow.incompatible.col.type.changes</name>
<value>false</value>
</property>
上述配置需要在hive-site.xml中配置,且hive metastore服务需要重启。
如果使用的是 Hive3,请禁用 Hive ACID:
hive.strict.managed.tables=false
hive.create.as.insert.only=false
metastore.create.as.acid=false
6)Sql初始化文件
1)创建初始化sql文件
vim conf/sql-client-init.sql
CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop:9083',
'hive-conf-dir' = '/opt/module/hive/conf',
'warehouse' = 'hdfs://hadoop:9000/paimon/hive'
);
USE CATALOG hive_catalog;
SET 'sql-client.execution.result-mode' = 'tableau';
2)启动sql-client时,指定该sql初始化文件
bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql
3)查看catalog
show catalogs;
show current catalog;
Paimon的写入性能与检查点密切相关,因此需要更大的写入吞吐量:
建议sink的并行度小于等于bucket的数量,最好相等。
当Sorted Run数量较少时,Paimon writer 将在单独的线程中异步执行压缩,因此记录可以连续写入表中。然而,为了避免Sorted Runs的无限增长,当Sorted Run的数量达到阈值时,writer将不得不暂停写入。下表属性确定阈值。
选项 | 必需的 | 默认 | 类型 | 描述 |
---|---|---|---|---|
num-sorted-run.stop-trigger | No | (none) | Integer | 触发停止写入的Sorted Runs次数,默认值为 ‘num-sorted-run.compaction-trigger’ + 1 |
当 num-sorted-run.stop-trigger
变大时,写入停顿将变得不那么频繁,从而提高写入性能。但是,如果该值变得太大,则查询表时将需要更多内存和 CPU 时间。如果您担心内存 OOM,请配置sort-spill-threshold。它的值取决于你的内存大小。
如果希望某种模式具有最大写入吞吐量,则可以缓慢而不是匆忙地进行Compaction。可以对表使用以下策略
num-sorted-run.stop-trigger = 2147483647
sort-spill-threshold = 10
Paimon使用LSM树,支持大量更新。 LSM 在多次Sorted Runs中组织文件。从 LSM 树查询记录时,必须组合所有Sorted Runs以生成所有记录的完整视图。
过多的Sorted Run会导致查询性能不佳。为了将Sorted Run的数量保持在合理的范围内,Paimon writers 将自动执行Compaction。下表属性确定触发Compaction的最小Sorted Run数。
选项 | 必需的 | 默认 | 类型 | 描述 |
---|---|---|---|---|
num-sorted-run.compaction-trigger | No | 5 | Integer | 触发Compaction的Sorted Run数。包括 0 级文件(一个文件一级排序运行)和高级运行(一个一级排序运行) |
在write初始化时,bucket的writer需要读取所有历史文件。如果这里出现瓶颈(例如同时写入大量分区),可以使用write-manifest-cache缓存读取的manifest数据,以加速初始化。
Paimon writer中主要占用内存的地方有3个:
配置“full-compaction.delta-commits”在Flink写入中定期执行full-compaction。并且可以确保在写入结束之前分区被完全Compaction。
注意:Paimon 默认处理小文件并提供良好的读取性能。请不要在没有任何要求的情况下配置此Full Compaction选项,因为它会对性能产生重大影响。
对于主键表来说,这是一种“MergeOnRead”技术。读取数据时,会合并多层LSM数据,并行数会受到桶数的限制。虽然Paimon的merge会高效,但是还是赶不上普通的AppendOnly表。
如果你想在某些场景下查询得足够快,但只能找到较旧的数据,你可以:
小文件会降低读取速度并影响 DFS 稳定性。默认情况下,当单个存储桶中的小文件超过“compaction.max.file-num”(默认50个)时,就会触发compaction。但是当有多个桶时,就会产生很多小文件。
您可以使用full-compaction来减少小文件。full-compaction将消除大多数小文件。
Paimon 对 parquet 读取进行了一些查询优化,因此 parquet 会比 orc 稍快一些。
Paimon的快照管理支持向多个writer写入。
默认情况下,Paimon支持对不同分区的并发写入。推荐的方式是streaming job将记录写入Paimon的最新分区;同时批处理作业(覆盖)将记录写入历史分区。
如果需要多个Writer写到同一个分区,事情就会变得有点复杂。例如,不想使用 UNION ALL,那就需要有多个流作业来写入“partial-update”表。参考如下的“Dedicated Compaction Job”。
默认情况下,Paimon writer 在写入记录时会根据需要执行Compaction。这对于大多数用例来说已经足够了,但有两个缺点:
为了避免这些缺点,用户还可以选择在writer中跳过Compaction,并仅运行专门的作业来进行Compaction。由于Compaction仅由专用作业执行,因此writer可以连续写入记录而无需暂停,并且不会发生冲突。
设置表属性 'write-only'='true'
,如果设置为 true,将跳过Compaction和快照过期。此选项与独立Compaction一起使用。
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.7-SNAPSHOT.jar \
compact \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition <partition-name>] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
如果提交一个批处理作业(execution.runtime-mode:batch),当前所有的表文件都会被Compaction。如果您提交一个流作业(execution.runtime-mode: Streaming),该作业将持续监视表的新更改并根据需要执行Compaction。
Paimon Writer每次提交都会生成一个或两个快照。每个快照可能会添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而,标记的数据文件并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。
目前,Paimon Writer在提交新更改时会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。
选项 | 必需的 | 默认 | 类型 | 描述 |
---|---|---|---|---|
snapshot.time-retained | No | 1 h | Duration | 已完成快照的最长时间保留。 |
snapshot.num-retained.min | No | 10 | Integer | 要保留的已完成快照的最小数量。 |
snapshot.num-retained.max | No | Integer.MAX_VALUE | Integer | 要保留的已完成快照的最大数量。 |
注意,保留时间太短或保留数量太少可能会导致如下问题:
Consumer Id
来保护快照过期的小保留时间内的流式读取)。创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。
判断分区是否过期:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time。比如:
CREATE TABLE T (...) PARTITIONED BY (dt) WITH (
'partition.expiration-time' = '7 d',
'partition.expiration-check-interval' = '1 d',
'partition.timestamp-formatter' = 'yyyy-MM-dd'
);
小文件可能会导致:
1)Flink Checkpoint的影响
使用Flink Writer,每个checkpoint会生成 1-2 个快照,并且checkpoint会强制在 DFS 上生成文件,因此checkpoint间隔越小,会生成越多的小文件。
默认情况下,不仅checkpoint会导致文件生成,writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用 write-buffer-spillable 在 writer 中生成溢出文件,从而在 DFS 中生成更大的文件。
所以,可以设置如下:
2)快照的影响
Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在 Snapshot 过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer 会自动使快照过期。
3)分区和分桶的影响
表数据会被物理分片到不同的分区,里面有不同的桶,所以如果整体数据量太小,单个桶中至少有一个文件,建议你配置较少的桶数,否则会出现也有很多小文件。
4)主键表LSM的影响
LSM 树将文件组织成Sorted Runs的运行。Sorted Runs由一个或多个数据文件组成,并且每个数据文件恰好属于一个Sorted Runs。
默认情况下,Sorted Runs数取决于 num-sorted-run.compaction-trigger
,这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。
5)仅追加表的文件的影响
默认情况下,Append-Only 还会进行自动Compaction以减少小文件的数量
对于分桶的 Append-only 表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。
6)Full-Compaction的影响
主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。
建议配置Full-Compaction,在Flink写入时配置‘full-compaction.delta-commits’定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。