Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Upserts Delete and Incremental的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi能够基于HDFS之上管理大型分析数据集,可以对数据进行插入、更新、增量消费等操作,主要目的是高效减少摄取过程中的数据延迟。
Apache Hudi官方网站是:?Apache Hudi
官方对 Hudi 的定义如下:
Apache Hudi is a transactional data lake platform that brings database and data warehouse capabilities to the data lake. Hudi reimagines slow old-school batch data processing with a powerful new incremental processing framework for low latency minute-level analytics.
Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。Hudi基于Parquet列式存储与Avro行式存储,同时避免创建小文件,实现高效率低延迟的数据访问。
Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。
Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive、Impala等。
Apache Hudi 的特点:
Apache Hudi在DFS的数据集上提供以下流原语
这两个原语是Hudi 使用最广泛的两个功能,这使用户能够吸收更改数据捕获,并将其大规模应用于数据湖。
Apache Hudi 设计原则:
流式读/写:Hudi借鉴了数据库设计的原理,从零设计,应用于大型数据集记录流的输入和输出。为此,Hudi提供了索引实现,可以将记录的键快速映射到其所在的文件位置。同样,对于流式输出数据,Hudi通过其特殊列添加并跟踪记录级的元数据,从而可以提供所有发生变更的精确增量流。
自管理:Hudi注意到用户可能对数据新鲜度(写友好)与查询性能(读/查询友好)有不同的期望,它支持了三种查询类型,这些类型提供实时快照,增量流以及稍早的纯列数据。在每一步,Hudi都努力做到自我管理(例如自动优化编写程序的并行性,保持文件大小)和自我修复(例如:自动回滚失败的提交),即使这样做会稍微增加运行时成本(例如:在内存中缓存输入数据已分析工作负载)。如果没有这些内置的操作杠杆/自我管理功能,这些大型流水线的运营成本通常会翻倍。
万物皆日志:Hudi还具有 append only、云数据友好的设计,该设计实现了日志结构化存储系统的原理,可以无缝管理所有云提供商的数据。
键-值数据模型:在写方面,Hudi表被建模为键值对数据集,其中每条记录都有一个唯一的记录键。此外,一个记录键还可以包括分区路径,在该路径下,可以对记录进行分区和存储。这通常有助于减少索引查询的搜索空间。
写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上生成代表Hudi表的一组文件。然后,在具有一定保证的情况下,诸如Apache Spark、Presto、Apache Hive之类的查询引擎可以查询该表。
Hudi表的三个主要组件:
Hudi提供了以下功能来对基础数据进行写入、查询,这使其成为大型数据湖的重要模块:
下面分别对每个组件进行介绍。
在它的核心,Hudi维护一条包含在不同的即时时间(instant time)对数据集做的所有instant操作的时间轴,从而提供,从不同时间点出发得到不同的视图下的数据集。时间轴类似于数据库的redo/transaction日志,由一组时间轴实例组成。Hudi保证在时间轴上执行的操作的原子性和基于即时时间的时间轴一致性。时间轴被实现为表基础路径下.hoodie元数据文件夹下的一组文件。具体来说,最新的instant被保存为单个文件,而较旧的instant被存档到时间轴归档文件夹中,以限制writers和queries列出的文件数量。Hudi时间轴包含以下组件:
Hudi保证在时间轴上执行的操作的原子性和基于即时时间的时间轴一致性。
每个instant都有avro或者json格式的元数据信息,详细的描述了该操作的状态以及这个即时时刻instant的状态。执行的关键操作包括
任何给定的即时都可以处于以下状态之一
示例:
上面的示例显示了在Hudi数据集上大约10:00到10:20之间发生的更新事件,大约每5分钟一次,将提交元数据以及其他后台清理/压缩保留在Hudi时间轴上。 观察的关键点是:提交时间指示数据的到达时间(上午10:20),而实际数据组织则反映了实际时间或事件时间,即数据所反映的(从07:00开始的每小时时段)。在权衡数据延迟和完整性时,这是两个关键概念。
如果有延迟到达的数据(事件时间为9:00的数据在10:20达到,延迟 >1 小时),我们可以看到upsert将新数据生成到更旧的时间段/文件夹中。 在时间轴的帮助下,增量查询可以只提取10:00以后成功提交的新数据,并非常高效地只消费更改过的文件,且无需扫描更大的文件范围,例如07:00后的所有时间段。
Hudi将DFS上的数据集组织到基本路径下的目录结构中。数据集分为多个分区,这些分区是包含该分区的数据文件的文件夹,这与Hive表非常相似。 每个分区被相对于基本路径的特定分区路径区分开来。
在每个分区内,文件被组织为文件组,由文件id唯一标识。 每个文件组包含多个文件切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(*.parquet)以及一组日志文件(*.log*),该文件包含自生成基本文件以来对基本文件的插入/更新。 Hudi采用MVCC设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收DFS上的空间。
Hudi通过索引机制将给定的hoodie键(记录键+分区路径)映射到文件组,从而提供了高效的Upsert。 一旦将记录的第一个版本写入文件,记录键和文件组/文件id之间的映射就永远不会改变。 简而言之,映射的文件组包含一组记录的所有版本。
Hudi通过索引机制提供高效的upsert操作,该机制会将一个记录键+分区路径组合一致性的映射到一个文件ID。这个记录键和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变。简而言之,这个映射文件组包含了一组文件的所有版本。Hudi 支持了多种类型的索引实现,典型的如 BLOOM、BUCKET 索引,以及自定义索引等方式。这些索引映射一个记录键到包含该记录的文件ID。这将使我们无需扫描表中的每条记录,就可显著提高upsert速度。Hudi索引可以根据其查询分区记录的能力进行分类:
Hudi支持以下存储类型。
写时复制:
写时复制存储中的文件片仅包含基本/列文件,并且每次提交都会生成新版本的基本文件。 换句话说,我们压缩每个提交,从而所有的数据都是以列数据的形式储存。在这种情况下,写入数据非常昂贵(我们需要重写整个列数据文件,即使只有一个字节的新数据被提交),而读取数据的成本则没有增加。 这种视图有利于读取繁重的分析工作。
示例1:
以下内容说明了将数据写入写时复制存储并在其上运行两个查询时,它是如何工作的。
随着数据的写入,对现有文件组的更新将为该文件组生成一个带有提交即时时间标记的新切片,而插入分配一个新文件组并写入该文件组的第一个切片。 这些文件切片及其提交即时时间在上面用颜色编码。 针对这样的数据集运行SQL查询(例如:select count(*)统计该分区中的记录数目),首先检查时间轴上的最新提交并过滤每个文件组中除最新文件片以外的所有文件片。 如您所见,旧查询不会看到以粉红色标记的当前进行中的提交的文件,但是在该提交后的新查询会获取新数据。因此,查询不受任何写入失败/部分写入的影响,仅运行在已提交数据上。
写时复制存储的目的是从根本上改善当前管理数据集的方式,通过以下方法来实现
示例2:
在instant 0 时刻写入一部分数据(ABCDE),在instant 1时刻更新A -> A',D -> D',在instant 2时刻更新A' -> A'',E -> E',并插入F 那么对于快照查询(Snapshot Query)每次都是读取的最新的FileSlice,增量查询(Incremental Query)读取指定commit之间的Parquet文件,然后再将时间范围下推至Parquet文件进行过滤,只读取符合条件的变更的数据。
读时合并:
读时合并存储是写时复制的升级版,从某种意义上说,它仍然可以通过读优化表提供数据集的读取优化视图(写时复制的功能)。 此外,它将每个文件组的更新插入存储到基于行的增量日志中,通过文件id,将增量日志和最新版本的基本文件进行合并,从而提供近实时的数据查询。因此,此存储类型智能地平衡了读和写的成本,以提供近乎实时的查询。 这里最重要的一点是压缩器,它现在可以仔细挑选需要压缩到其列式基础文件中的增量日志(根据增量日志的文件大小),以保持查询性能(较大的增量日志将会提升近实时的查询时间,并同时需要更长的合并时间)。
以下内容说明了存储的工作方式,并显示了对近实时表和读优化表的查询。
示例1:
读时合并存储上的目的是直接在DFS上启用近实时处理,而不是将数据复制到专用系统,后者可能无法处理大数据量。 该存储还有一些其他方面的好处,例如通过避免数据的同步合并来减少写放大,即批量数据中每1字节数据需要的写入数据量。
示例2:
对于MOR表,快照查询(SNAPSHOT Query)读取的是Base文件与Log合并后的最新结果;而增量查询读取指定commit之间的Parquet以及Log文件,然后再对Log文件进行Block级别的过滤(根据Commit时间),合并重复key后返回结果。
Hudi支持以下存储数据的视图
这个场景主要是DB数据入湖入仓,把原来T + 1的数据新鲜度提升到分钟级别。数据新鲜度通过目前比较火的以Debezium、Maxwell为代表的CDC(change Data Capture)技术实现。以Streaming近实时的方式同步到数仓中。在传统的Hive数仓中想保证实时是非常困难的,尤其是文件更新,湖表实时写入更新,基本不可能实现。
CDC技术对数仓本身存储是有要求的,首先是更新效率得足够高,能够支持以Streaming方式写入,并且能够非常高效的更新。尤其是CDC log在更新过程中还可能会乱序,如何保证这种乱序更新的ACID语义,是有很高要求的,当前能满足乱序更新的湖格式只有Hudi能做到,而且Hudi还考虑到了更新的效率问题,是目前比较先进的架构。
图中方案3相比上面的方案,比较适合体量比较大(每天增量能达到亿级别)、数据平台比较健全的公司,中间有一套统一的数据同步方案(汇总不同源表数据同步至消息队列),消息队列承担了数据的容错、容灾、缓存功能。同时,这套方案的扩展性也更好。通过Kafka的topic subscribe方式,可以比较灵活地分发数据。通过以上三种方式入湖Hudi,以某数据中台为例已经有6000多张源表写入Hudi,日增几十亿数据入湖。
构造分钟级别的实时数仓,分钟级别的端到端数据新鲜度,同时又非常开放的OLAP查询引擎可以适配。其实是对Kappa架构或者是原先Streaming数仓架构的一套新解法。在没有这套架构之前,实时分析会跳过Hudi直接把数据双写到OLAP系统中,比如ClickHouse、ES、MongoDB等。当数仓存储已经可以支持高效率分级别更新,能够对接OLAP引擎,那么这套架构就被大大简化,首先不用双写,一份数据就可以保证only one truth语义,避免双写带来数据完整性的问题。其次因为湖格式本身是非常开放的,在查询端引擎可以有更多选择,比如Hudi就支持Presto、Trino、Spark、StarRocks,以及云厂商的Redshift引擎,会有非常高的灵活度。多层数据可见性也从T+1 小时或天,缩短到分钟级别。
Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload 在写入和读取 Hudi 表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数"hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。为了实现 pv/uv计算,我们实现了 RecordCountAvroPayload ,它可以在对数据去重的时候,将重复数据的数量记录下来,这里的重复指的是HoodieKey(primary key + partition path)相同。以往处理方式是通过Flink + window 聚合实现,该方式有延迟数据丢弃和state爆掉风险,Hudi Payload机制则没有这些风险。
上图是一个典型的非常复杂的业务落地,消息流1由Kafka写入Hudi商品销售明细表,消息流2由Kafka写入Hudi用户基本属性表,然后结合Hudi商品标签表和Hve用户扩展属性表进行实时和离线拼接大宽表。
在实现多流拼接功能前有三个前置条件需要满足:
这里主要描述基于时间线服务器的标记机制,该机制优化了存储标记的相关延迟。Hudi 中的时间线服务器用作提供文件系统和时间线视图。新的基于时间线服务器的标记机制将标记创建和其他标记相关操作从各个执行器委托给时间线服务器进行集中处理。时间线服务器在内存中为相应的标记请求维护创建的标记,时间线服务器通过定期将内存标记刷新到存储中有限数量的底层文件来实现一致性。通过这种方式,即使数据文件数量庞大,也可以显著减少与标记相关的实际文件操作次数和延迟,从而提高写入性能。
优势
劣势
Iceberg和Hudi都是数据湖技术,从社区活跃度上来看,Iceberg有超越Hudi的趋势。他们有以下共同点:
Iceberg与Hudi之间不同点在于以下几点:
我们来看下当前github上这两种技术的发展现状,当前iceberg的fork为1.9k,star为5.1k。hudi的fork 2.5k,star为4.9k。相比来看,iceberg有追上hudi的趋势。
通用Hudi湖仓方案如下:
尽管 Hudi 解决方案已经能够实现一份存储同时包含实时和离线两种场景,但由于数据的分钟级可见,它依然存在一定的优化空间,无法作为实时数仓存储的标准方案。字节在其基础上进行了方案优化。
为解决实时性问题,字节内部在数据湖上自研了基于内存的服务,形成了一套高吞吐、高并发、秒级延迟可见的实时数据湖方案。整体架构如下:
架构底层为数据持久化层。复用 Hudi 的能力实现数据存储。文件分布和 Hudi 一致,通过列存的 base 文件与行存的 log 文件进行数据存储,基于时间戳维护数据版本。通过 filegroup 的方式对文件进行分组,相同逐渐的数据存储在同一个文件组内。后期结合数据构建索引能力,能够比较大幅度提升数据入湖和查询的性能。
架构的第二层是元数据层。对数据湖的元数据进行管理,包括表、分区以及 instant、timeline、snapshot 等这些数据湖特有的元数据。在这一层不光实现了元数据的管理,还能够解决多并发写入的冲突检查和解决,保障 ACID 能力。
架构的第三层是服务层。主要包含两个组件:BTS 和 TMS。BTS 是基于内存构建的服务层,通过内存加速数据读写操作,解决实时场景下数据生产消费的时效性问题。TMS 是聚焦在表优化的服务,会异步做一些 log 文件和 base 文件的compaction/小文件合并优化等操作。
Shopee 内部在使用过程中基于开源的 Apache Hudi 定制了自己的版本,以实现企业级的应用和一些内部业务需求的新特性。通过引入 Hudi 的 Data lake 方案,Shopee 的 Data Mart、推荐、ShopeeVideo 等产品的数据处理流程实现了流批一体、增量处理的特性,很大程度上简化了这一流程,并提升了性能。
Shopee选择使用Hudi有三个原因:
Shopee实时数据集成:
目前 Shopee 内部有大量的业务数据来自业务数据库,它们采用类似 CDC 的技术获取数据库中的变更数据,给业务方构建支持批处理和近实时增量处理的 ODS 层数据。当一个业务方的数据需要接入时,会在进行增量实时集成之前先做一次全量 Bootstrap,构建基础表,然后基于新接入的 CDC 数据进行实时构建。构建的过程中,一般根据用户需求选择构建的 COW 表或者 MOR 表。
Shopee主要使用 Hudi 的核心特性以及在此之上的扩展改造,分别满足了三个主要的用户需求场景:实时数据集成、增量视图和增量计算。并为用户带来了低延时(约 10 分钟)、降低计算资源消耗、降低存储消耗等收益。
当前Hudi主要还是应用在准实时场景:上游从Kafka以append模式接入ods的cow表,下游部分dw层业务根据流量大小选择不同类型的索引表,比如bucket index的mor表,在数据去重后进行dw构建,从而提供统一数据服务层给下游的实时和离线的业务,同时ods层和dw层统一以insert overwrite的方式进行分区级别的容灾保障,Timeline上写入一个replacecommit的instant,不会引发下游流量骤增,如下图所示:
VIVO分别在流批场景、小文件治理、生命周期管理等方向做了相关优化,上线后的收益主要体现在四个方向: