Apache Iceberg 是由 Netflix 开发开源的,其于2018年11月16日进入 Apache 孵化器,是 Netflix 公司数据仓库基础。Apache Iceberg设计初衷是为了解决Hive离线数仓计算慢的问题,经过多年迭代已经发展成为构建数据湖服务的表格式标准。
Iceberg 本质上是一种专为海量分析设计的表格式标准,可为主流计算引擎如 Presto、Spark 等提供高性能的读写和元数据管理能力。Iceberg 不关注底层存储(如 HDFS)与表结构(业务定义),它为两者之间提供了一个抽象层,将数据与元数据组织了起来。
Apache Iceberg官方网站是:Apache Iceberg
官方对Iceberg的定义如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.
上述翻译过来就是:
Apache Iceberg是一种用于大型分析数据集的开放表格式。Iceberg使用一种高性能的表格式将表添加到计算引擎中,这些引擎包括Spark、Trino、PrestgreSQL、Flink、Hive和Impala,该格式的工作方式类似于SQL表。
Apache Iceberg作为一款新兴的数据湖解决方案在实现上高度抽象,在存储上能够对接当前主流的HDFS,S3文件系统并且支持多种文件存储格式,例如Parquet、ORC、AVRO。相较于Hudi、Delta与Spark的强耦合,Iceberg可以与多种计算引擎对接,目前社区已经支持Spark读写Iceberg、Impala/Hive查询Iceberg。
从Iceberg的定义中不难看出,它的定位是在计算引擎之下,又在存储之上。同时,它也是一种数据存储格式,Iceberg则称其为"table format"。因此,这类技术可以看作介于计算引擎和数据存储格式中间的数据组织格式,通过特定的方式将数据和元数据组织起来,所以称之为数据组织格式更为合理。
Iceberg用户体验:
Iceberg避免了令人不愉快的惊喜。模式演化工作正常,并且不会意外地取消删除数据。用户无需了解分区即可获得快速查询。
Iceberg 主要特性:
依赖以上特性,Iceberg 可帮助用户低成本的实现 T+0 级数据湖。
可靠性和性能
Iceberg专为大型表格而构建。在生产环境中,Iceberg被用于单个表格可能包含数十PB的数据,甚至这些庞大的表格也可以在没有分布式SQL引擎的情况下进行读取。
开放标准
Iceberg被设计和开发为一个开放的社区标准,具有规范以确保跨语言和实现的兼容性。
Apache Iceberg是开源的,并在Apache Software Foundation进行开发。
核心能力
(1) 灵活的文件组织
(2) 丰富的计算引擎
(3) 优化数据入湖流程
(4) 增量读取数据处理能力
Iceberg实现核心思想:
Iceberg的核心思想,就是在时间轴上跟踪表的所有变化:
Iceberg元数据管理:
Iceberg将数据进行分层管理,主要分为:元数据管理层和数据存储层。
元数据管理层又可以细分为三层:
基于snapshot的管理方式,Iceberg能够进行time travel(历史版本读取以及增量读取),并且提供了serializable isolation。
数据存储层支持不同的文件格式,目前支持Parquet、ORC、AVRO。
总体来讲Iceberg分为两部分数据,第一部分是数据文件,如下图中的 parquet 文件。第二部分是表元数据文件(Metadata 文件),包含 Snapshot 文件(snap-*.avro)、Manifest 文件(*.avro)、TableMetadata 文件(*.json)等。
元数据文件
其中metadata目录存放元数据管理层的数据,表的元数据是不可修改的,并且始终向前迭代;当前的快照可以回退。
快照:
快照代表一张Iceberg表在某一时刻的状态。也被称为清单列表(Manifest List),里面存储的是清单文件列表,每个清单文件占用一行数据。清单列表文件以snap开头,以avro后缀结尾,每次更新都产生一个清单列表文件。每行中存储了清单文件的路径。
清单文件里面存储数据文件的分区范围、增加了几个数据文件、删除了几个数据文件等信息。数据文件(Data Files)存储在不同的Manifest Files里面,Manifest Files存储在一个Manifest List文件里面,而一个Manifest List文件代表一个快照。
清单文件:
是以avro格式进行存储的,以avro后缀结尾,每次更新操作都会产生多个清单文件。其里面列出了组成某个快照(snapshot)的数据文件列表。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据的行数等信息。其中列级别的统计信息在 Scan 的时候可以为算子下推提供数据,以便可以过滤掉不必要的文件。
数据文件
data目录组织形式类似于hive,都是以分区进行目录组织。
Iceberg的数据文件通常存放在data目录下。一共有三种存储格式(Avro、Orc和Parquet),主要是看选择哪种存储格式,后缀分别对应avro、orc或者parquet。在一个目录,通常会产生多个数据文件。
快照隔离
增量读取数据
Iceberg的每个snapshot都包含前一个snapshot的所有数据,每次都相当于全量读取数据,对于整个链路来说,读取数据的代价是非常高的。
如果我们只想读取当前时刻的增量数据,就可以根据Iceberg中Snapshot的回溯机制来实现,仅读取Snapshot1到Snapshot2的增量数据,也就是下图中的紫色数据部分。
同理,S3也可以只读取红色部分的增量数据,也可以读取S1-S3的增量数据。
Iceberg支持读写分离,也就是说可以支持并发读和增量读。
原子性操作
对于文件列表的所有修改都是原子操作。
注意:Iceberg是以文件为粒度提交事务的,所以就没有办法做到以秒为单位提交事务,否则会造成文件数据量膨胀。
比如Flink是以CheckPoint为写入单位,物理数据在写入Iceberg之后并不能被直接查询,只有当触发了CheckPoint时才会写metadata,这时数据才会由不可见变成可见。而每次CheckPoint执行也需要一定的时间。
写操作要求
原子性替换保证了线性的历史;
原子性替换需要依靠以下操作来保证。
冲突解决--乐观锁
Flink+Iceberg最经典的一个场景就是构建实时的Data Pipeline。业务端产生的大量日志数据,被导入到Kafka这样的消息队列。运用Flink流计算引擎执行ETL后,导入到Apache Iceberg原始表中。有一些业务场景需要直接跑分析作业来分析原始表的数据,而另外一些业务需要对数据做进一步的提纯。那么我们可以再新起一个Flink作业从Apache Iceberg表中消费增量数据,经过处理之后写入到提纯之后的Iceberg表中。此时,可能还有业务需要对数据做进一步的聚合,那么我们继续在Iceberg表上启动增量Flink作业,将聚合之后的数据结果写入到聚合表中。
该场景也可通过Flink+Hive方式实现,但Flink长期高频率地写入会造成Partition膨胀。而Iceberg容许实现1分钟甚至30秒的增量写入,这样就可以大大提高了端到端数据的实时性,上层的分析作业可以看到更新的数据,下游的增量作业可以读取到更新的数据。
可以用Flink+Iceberg来分析来自MySQL等关系型数据库的binlog等。一方面,Apache Flink已经原生地支持CDC数据解析,一条binlog数据通过ververica flink-cdc-connector拉取之后,自动转换成Flink Runtime能识别的INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER四种消息,供用户做进一步的实时计算。
此外,CDC数据成功入湖Iceberg之后,还会打通常见的计算引擎,例如Presto、Spark、Hive等,他们都可以实时地读取到Iceberg表中的最新数据。
针对CDC场景而言,Hudi在这方面的能力比Iceberg更成熟,所以厂商更多选用Hudi。
采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。如果需要过去很长时间例如一年的数据,可以采用常见的Lambda架构,离线链路通过kafka->flink->iceberg同步写入到数据湖,由于Kafka成本较高,保留最近7天数据即可,Iceberg存储成本较低,可以存储全量的历史数据,启动新Flink作业的时候,只需要去拉Iceberg的数据,跑完之后平滑地对接到kafka数据即可。
在Lambda架构下,实时链路由于事件丢失或者到达顺序的问题,可能导致流计算端结果不一定完全准确,这时候一般都需要全量的历史数据来订正实时计算的结果。而我们的Iceberg可以很好地充当这个角色,因为它可以高性价比地管理好历史数据。
原有的lambda架构,分为离线链路和实时链路。可通过iceberg进行存储中间数据,Flink统一进行计算,实现近实时场景的流批一体。
无论是用于加速数据的可见性、构建CDC还是用Iceberg替代Hive表的低效查询,都会带来一定的性能提升。
使用Iceberg构建湖仓的优点主要包括:
然而,使用Iceberg构建湖仓也存在一些缺点:
数据治理方面iceberg也存在一些缺点:
当单链路的数据量达到分钟级,每日达到万亿规模时,湖仓一体的性能问题就需要格外重视。
湖仓一体实时性限制:
抛开内核,无论是Iceberg还是Hudi,本质上都是海量文件的组织方式,无法摆脱存储的限制,我们通常会把它存到内部的HDFS上,云上则会存到对象存储中。但对象存储也有它的限制,吞吐量较大,但延迟会较高。
如果需要流读,我们通常在构建实时链路的时候,会选择消息队列,它的存储模型完全不同,是低延迟高响应,顺序读写。它的存储能力决定了计算,流式计算的访问方式和离线计算的访问方式不同。
这个时候就会出现两个问题:
由于Iceberg的特性,大多数的厂商都是基于iceberg构建湖仓。针对Iceberg的缺点,各厂商也对内核进行了优化,保证湖仓的稳定运行。
腾讯构建的近实时流批一体架构如下图所示。腾讯针对Iceberg的不足对内核做了大量的优化。功能上增加了大宽表支持、跨源查询支持、流转批、流式写入支持去重、增量读取、流量控制等。性能上完成元数据读取加速、复杂类型列剪支优化、V2表layout改进与合并加速、向量化,Async-IO,CBO等查询加速。通过小文件合并、自动重分布优化、自动优化实现自动数据治理。
参考Paimon方案,腾讯做了类似的方案。在这个方案中,流和批选择了不同的存储,流选择使用消息队列,批则是底层使用数据湖的格式,封装在一起就成为了流批表。有了流批表,则能够对外提供统一的流和批的读写接口。
针对Flink场景,写的时候会双写到LogStore和Filestore这两个系统中,根据不同的场景读不同的系统。如果是流式则读LogStore,批则读Filestore。
网易主要研发了 Arctic 产品,它是基于 Iceberg 去构建湖仓一体的系统。其定位是在 hive 和 Iceberg 之上,在计算引擎之下的一个 TableService, 并提供表结构优化以及 Kafka 以及 redis, Hbase 等 KV 存储封装的实时湖仓系统。
Arctic 的优势首先是基于 iceberg,兼容 iceberg 所有功能,同时对 hive 兼容性好,在短时间业务升级阻力更低;支持动态调度自动触发合并任务,提供分钟级别延迟数仓的 merge on read;在开启 hidden on queue 的情况下,提供流批一体的功能包括秒级实时订阅和实时 join;还提供方便管理的运维平台,方便业务更快的上手。
网易选择使用 Apache Iceberg,主要考虑是因为 Iceberg 本身的元数据管理是面向文件的,有非常全的 manifest 机制,可以把表中的所有文件管理起来,Iceberg 作为底座提供了 ACID 的事务保证以及 MVCC 功能,可以保证数据的一致性,同时又具有可扩展性。在 Iceberg 的基础上,团队又自研了实时摄取、文件索引、数据合并,以及一整套元数据管理服务。
底层技术选型方面,Hudi 的文件索引采用的是 Bloomfilter 以及 HBase 的机制,这两种机制都不是特别理想,HBase 需要引入第三方 KV 数据库,对商业输出不利,而 Bloomfilter 比较重,会让实时性大打折扣,因此都不太适合网易数帆的技术选型。网易数帆对 Arctic 核心功能的想法和设计,也跟 Hudi 有出入。而没选 Delta Lake 则是因为它对实时性并不是看得很重。网易数据团队通过研究相关论文发现,Delta Lake 更多还是把 Spark 的生态作为第一优先级,这与团队做湖仓一体的目标还是有一些区别。相比之下,Iceberg 相对更开放,对计算引擎的集成、对上层元数据的集成、对不同系统的集成都做得比较好,可以满足团队高度定制化的需求。
网易集团内部主要进行 Lambda 架构的改造,而针对企业客户,主要的实践是 Kappa 架构。
严选根据当时社区发展情况和严选当时的需求场景最终选择了Iceberg,主要考虑因素如下:
严选通过缓存统计信息过滤优化、小文件合并、重写deleteFile、重排序等一系列优化,大大提高查询效率。
未来,严选在查询体验上,计划让presto也接入iceberg的支持,引入Alluxio缓存来加速元数据的加载和缓存数据,加入Z-order数据重排序和Bloom-Filter文件索引等功能提升查询效率。另外把文件监控、健康检查等功能产品化以提升易用性。
B站同样选择Iceberg,因为Iceberg在三个里面是表存储格式抽象的最好的,包括读写引擎、Table Schema、文件存储格式都是pluggable的,可以进行比较灵活的扩展,并保证和开源以及之前版本的兼容性。
下图是B站整体的湖仓一体架构,支持开放的Spark、Flink等引擎从Kafka、HDFS接入数据,然后Magnus服务会异步地拉起Spark任务对Iceberg数据进行重新的存储组织优化,主要是用Trino作为查询引擎,并引入Alluxio做Iceberg的元数据和索引数据的缓存加速。
Magnus是B站湖仓一体架构的核心组件,它负责管理优化所有的Iceberg表中的数据。Iceberg本身是一个表存储格式,虽然其项目本身提供了基于Spark、Flink等用于合并小文件,合并metadata文件或者清理过期Snapshot数据等Action Job,但是要依赖外部服务调度这些Action Job,而Magnus正是承担这个角色。B站对Iceberg进行了扩展,当Iceberg表发生更新的时候,会发送一个event信息到Magnus服务中,Magnus服务维护一个队列用于保存这些commit event信息,同时Magnus内部的Scheduler调度器会持续消费event队列,并根据对应Iceberg表的元数据信息及相关的策略决定是否及如何拉起Spark任务优化Iceberg表的数据组织。
基于Iceberg的湖仓一体方案在B站的数据分析场景正逐渐落地,目前已经支撑PB级的数据量,每天响应几万个查询,其中P90的查询可以在1s内响应,满足了多个运营分析数据服务交互式分析的需求。
汽车之家首先打通Append入湖链路、Flink SQL 入湖链路,然后内部对接预算体系、权限体系等。在对接过程中通过方法重写优化解决iceberg小文件等问题,从而实现湖仓的构建,保证湖仓的性能。