2023年12月,由阿里云主办的实时计算闭门会在北京举行,阿里云实时数仓Hologres研发负责人姜伟华现场分享Hologres+Flink构建的企业级实时数仓,实现全链路的数据实时计算、实时写入、实时更新、实时查询。同时,随着流式湖仓的兴起,Hologres除了支持Delta、Hudi等通用湖格式,在今年新增了对Paimon的深度集成,不断拓展湖仓一体能力。
随着大数据从规模化走向实时化,实时数据的需求覆盖互联网、交通、传媒、金融、政府等各个领域。实时计算在企业大数据平台的比重也在不断提高,部分行业已经达到了50%。Hologres+Flink通过众多的丰富企业级能力,替换开源复杂的各类技术组件,减少多种技术栈学习、多种集群运维、多处数据一致性维护等成本,让企业专注于业务,实现降本增效。
Hologres 是阿里云自研一站式实时数仓,以分析服务一体化架构,统一数据平台架构,实现一份数据,同时支持支持多维分析、在线服务、湖仓一体、向量计算多个场景,其中包含了:
数据高性能实时写入、更新与查询,实现写入即可查,支持列存、内置索引加速
超高QPS下KV与SQL点查、非主键点查,支持行存、具备高可用能力
无需数据搬迁,对MaxCompute、数据湖中的表进行秒级交互式查询,元数据自动发现
内置达摩院Proxima向量引擎,QPS与召回率性能超过开源向量数据库数倍
与开源组件不同的是,企业级的实时数仓需要帮助企业快速实现各类资源隔离、数据安全、敏捷运维等能力,让企业能够持续稳定、高效使用数据,保持大数据平台实时在线运行。Hologres具有资源隔离、数据加密、数据脱敏、灾备,数据备份恢复、IP白名单、数据治理,数据血缘等丰富的企业级能力。
多个计算实例组成一主多从模式,实例间共享一份存储,计算资源隔离,实现写入和读取隔离,查询和服务隔离。支持故障管理,故障节点快速自动恢复,盘古三副本提供高可靠冗余存储。
具备一定自运维能力,内置查询历史、元仓表等运维诊断信息,用户可以基于查询历史和表的元数据,提供丰富的监控和告警指标,快速定位系统瓶颈和风险点,提升自运维能力。
支持细粒度访问控制策略,支持BYOK数据存储加密和数据脱敏,支持数据备份与恢复,支持RAM、STS及独立账号等多种认证体系,通过PCI-DSS安全认证(PCI-DSS是目前全球最严格且级别最高的金融数据安全标准)。
实时数据处理导致成本增加,Hologres提供table info,包含各类数据使用的日志信息。方便了解数据有没有人在用,用了多少次,让企业可以做更好地做成本控制。
Hologres+Flink这套组合是在阿里集团内部经过多年实时化场景打磨探索出来的最佳架构,例如淘天用户增长团队成功让 3-5min 的画像分析提升到 10s 左右,CCO客户服务团队数据分析效率提升10倍,淘菜菜一年成本降低几百万。通过多年的积累,Hologres+Flink产品功能逐渐互补,以实时计算Flink为中心,实时数仓Hologres围绕其有多项产品使用路径:Hologres能够作为Flink的维表来使用;通过Flink能够把加工好的结果写入Hologres;Hologres提供binlog能够被Flink消费;Hologres Catalog 支持元数据服务、整库同步、SchemaEvolution等,后续将会具体介绍。
Hologres作为Flink的实时维表,相比其他维表具有以下优势:
Flink+Hologres组合支持高性能实时写入,右边是我们一年前的实测数据,今年应该会更高一点。可以看到在128CU的配置下,当写入表无主键,那大概每秒钟能写230万条;当写入表有主键,主键冲突丢弃新行,每秒写入可达200万条;一般来说,表格更新需要反查,那会随着数据量增大,更新性能会下降,Hologres在已经拥有20亿条数据表格做出更新,也能达到每秒钟70万条的更新性能,这种实时写入与更新的性能,是非常合适和Flink这种大量的更新和删除搭配使用的。
Hologres具有很强的更新能力的同时,还支持宽表的局部更新,在一定程度上替代Flink的多流join,同时还做了一些很细致的优化。例如上游的数据业务中数据有时候是乱序的,1点钟生产数据与1点05分生产的数据,我们希望1点05分生产的数据覆盖之前的数据,因为数据从业务时间上来说,肯定是希望后面的数据覆盖前面。但是因为整个计算链路的不确定性,它会使得有可能1点05分这个数据先到,那1点钟那个数据后到的。如果直接盲写就会变成把1点钟的数据覆盖了1点05分的数据。但Hologres在这种情况下不会做这个覆盖,即使上游乱序了,也能存储到最新的数据,保证上下游数据的一致性。
最后在Flink还未发布的1.19版本,Hologres引入了一种称之为叫fixed copy的模式。这个模式相比于右边的这个图的写入性能会更好,CPU资源也省出更多。
除了刚刚维表join以外,在流计算里面另一个痛点就是双流join,如果每一路都要保证完整的状态,从理论上来说,它就是一个成本很高的事情。我们这个方案并不是完全替代双流join,而是对于例如用户画像场景,有一个明确的ID,然后希望基于这一个已有的ID去关联若干流的数据。在这种场景下,Hologres就可以很简单的替代掉双流join,来实现一个更低成本的这样一个关联。
在画像场景里,我们要描述一个用户的画像或者一个商品的画像,有很多个维度,例如说一个用户,他的浏览习惯是什么,他的履约习惯是什么,他的退货习惯是什么等等,可以从各种维度去看这个客户,然后我们要去给用户画一个画像,判断他到底属于哪一类用户。那进行分类的时候,首先我们肯定希望知道这个用户的所有信息。希望以用户ID作为粒度,把所有用户的信息全部放在一起,然后交给一个分类器去分类,判断这个用户到底属于哪一类,这就是用户画像的一种很经典的用法。
在没有Hologres产品加入之前,只能是Flink做双流join,用Flink形成这样一个宽的一个字段,不同的维度有着不同的字段。但加入Hologres后,等于在Hologres里面建一张宽表,这张表里面它的主键就是用户ID,然后不同的Flink任务去写不同的字段。这并不是指一个任务去写所有整一行,是指每个任务仅仅写各自的几个字段,同一张表有着不同字段。这样的话利用Hologres局部更新能力,相当于自动把用户的不同维度数据关联在了一起。
有了以上表现,Hologres相对其他数仓产品比较有优势的一点是还支持了binlog。在这张宽表里面任何一个字段发生变化了以后,我们的binlog里会把整个这一行的数据都给它吐出来,并在binlog里显示出来,Flink再去对接这个binlog,这时就知道这一行的最新情况是什么,可以为这个用户重新去计算画像。
利用了Hologres的逐步更新,加上binlog配合Flink可实现一个实时用户画像的做法,它可以极大的降低成本。反过来的话,当Hologres作为Flink的源表来用,Flink通过流和批的模式都可以把这张表数据读出来。并且还可以自定义它的变化,例如全增量一体化读取,或者存量部分可以只读增量。
Hologres同时也对接了Flink的catalog,直接可以读取元数据,Flink的create table as和create database as这些语法,包括schema evolution都可以很好的适配。
如何去构建实时数仓?对于离线数仓的构建,我们具有非常标准的方法论体系,数据进来以后,从ODS层、DWD层、DWS层、ADS层这样一层一层的加工,每一层都是通过定时调度任务来完成的。
在实时数仓中,从数仓的角度来说,肯定具有分层需求。怎么样能够形成一个比较好用的方案呢?如何去解决一个实时数仓分层的问题?如果能解决这些问题,我们就能够让数据在数仓的各个层次之间自由地流动。下面为具体实时数仓分层方案介绍:
第一种最经典的数仓分层方案是经过Flink加工后交给Kafka,每加工一层就交给Kafka,然后通过Flink再加工写到下一层Kafka,写到最后通过Flink计算写到一个KV引擎对外提供服务,大家有时候不认为这是数仓分层,因为没有看见仓的概念,只存在一层层数仓数据加工。
Flink+Kafka的方案有个很严重的问题:每一层Kafka数据就是给Flink用,它几乎不能执行其他事情,当然可以说用Presto对接一下Kafka的数据源,然后去查询数据是否出错是可以的,但也就存在这点用处。所以一般大家的做法就是在下面再接一个实时数仓,把所有的开发数据再同步一遍到实时数仓里,大家要查询数据分析数据,请用实时数仓,然后Flink消费就用Kafka,这是一个经典架构,也非常成熟。但它的不足之处在于:各种同步数据要存两份,资源消耗很大,整个处理链路也很复杂;中间数据Kafka出了问题不便于排查,数据订正都比较麻烦。有时上游想要加上字段,下游需要更改更多的字段,不易响应schema动态变化。
第二个方法就是用离线的方法。Flink负责数据的清洗关联,清洗后的明细数据实时写入实时数仓形成DWD层。再由高频调度(分钟级别)构建DWS/ADS层,实现分钟级增量更新
这样的好处是方案成熟简单,成本较低。但它的不足之处就是时效性差。因为Flink写进的数据其实很及时的,但是调度任务其实很难做到很高频调度。因为再往下做调度,比方5分钟的调度,它难度就一下子上去了。因为数据量大时有时候5分钟无法跑完数据,只能增量调度,将会更加复杂。所以许多用户在实际使用时候往往选择小时级别调度,实时数仓反而退化成了准实时数仓。
第三种是物化视图的方案。其实本质上来说是前面两个方案的一个结合,Flink负责数据的清洗关联,清洗后的明细数据实时写入实时数仓形成DWD层。在实时数仓内通过物化视图来加工DWS或者ADS。现在大家各自提供的产品能力,基本都以批量物化方式运行,在本质上是将调度任务内置化。Hologres现在也在做实时的物化,在集团内已经在使用了,后续会开放到公共云,但这种实时物化视图对于技术的挑战还是比较较大的。
对于以上3种数仓分仓方案分析后,如果通过Hologres将Kafka全部替换,并使用行列共存本进行存储,就可以实现了数据在Flink和Hologres之间的传输。在统一了架构基础上,使数仓每一层的数据可以被查询和修改,并利用Flink和Hologres的资源强隔离能力,整套系统在生产环境中是高可用的。下面介绍基于Flink+Hologres的Streaming Warehouse方案详细内容。
我们将全部的Kafka换成Hologres,将KV也替换成Hologres,整个链路中,数据从Flink写进来以后就可以直接传入Hologres里。行列共存表同时会存两份数据,一份行存,一份列存,他们两份是强一致的,没有任何额外的管理开销,甚至性能有些场景还会提升一点。相比之前Kafaka架构,Flink+Hologres具有以下优势:
基于Flink+Hologres的Streaming Warehouse方案,实时数仓Hologres主要提供了三大核心能力:
如图所示,在架构图标注Flink streaming的地方,在整个数据实时加工链路中,完全不用写到任何Hologres的SQL,整个数据加工链路都完全用FlinkSQL表示。虽然FlinkSQL和Hologres SQL语法较为不同,但都可以同时对数仓每层的存储查询和复用。
该客户为物流类的客户,其核心业务围绕仓库展开,数据仓库业务场景较为复杂,主要业务分成三大类:
在以上三种业务之下,客户实时数仓大概拥有350左右的任务,用上了上万CU实时计算规模。同时对一些重要任务标定级,例如P2以上的任务,占它们的任务里面可能达到70%以上,可以看到实时对他们业务场景来说是非常重要的。其次是业务数据量实时吞吐高,日均流写入量是每秒钟2000万条。峰值的时候能到6000万条每秒,同时每天会产生每秒50万条以上的输出结果,整个数据量大概是在几百TB。
实时数仓1.0-成本降低70%-120%
这个客户是Hologres和Flink比较早期的客户,在2020年,Hologres处于商业化的一年,客户将Hologres仅仅用来替换OLAP引擎。从最底层看起,使用Kafka加Flink进行加工,加工结束后,其数据需要进行双写,分别给物流作业系统和OLAP查询系统使用,前一个数据是高保障的,如果挂了,那整个仓管理就会出现混乱的状况。客户早期对Hologres产品还是有心存疑虑,所以一份写到Lindorm,对外提供重要服务的KV点查能力,一份写到Hologres,对内提供OLAP灵活分析。这是客户最早的一个做法,主要是替换OLAP引擎,利用Hologres的强大的写入与查询性能,对于他们来说主要实现成本降低70%-120%。
实时数仓2.0-开发效率提升100%
到2022年,Hologres的Binlog产品功能已经被很多客户使用,客户通过一段时间的熟悉,对Hologres的产品也有了更多的信心。同时Hologres推出主从实例,一份数据上有一个或多个实例,实例之间完全实现共享一份数据,计算资源也是进行强隔离,于是客户将实时数仓架构升级到2.0。
从最底层看,MaxCompute进行离线数据加工,Flink写到Hologres主实例,再进行订阅,再进行二次消费,再写回去,形成一个环。客户所有查询都是通过从实例对外提供的,无论是圈选物流作业这种高保障任务,还是OLAP分析需要资源多,但是保障级别比较低的任务,都可以通过多个从实例之间的隔离保证资源隔离。整个链路中,Flink加Hologres形成的一个闭环,通用Hologres来统一对外提供数据服务。
从客户角度来看,他们实现了读写分离,同城容灾;存储上多种计算资源变相隔离,并在可用性和成本间取得比较好平衡。客户自己统计故障从6次降低为0次,整个存储量下降几百个TB,整个开发效率大幅度提升。
实时数仓3.0-性能提升100%-200%:
2023年Hologres推出计算组实例,是主从实例的升级版本,虽然主从实例有着计算之间强隔离的优点,但是最大问题是每一个实例都是独立的入口,如果白天想增加两个从实例,晚上业务不忙情况下想减去,就很难实现,业务需要重新发布。
计算组实例底下也可以认为就是主从实例,但是它上面有一个统一的入口。增加从实力或者修改名字为计算组,增加计算组应用并不需要有任何的改动,增加计算组只需要管理员配一个路由规则,这个路由规则指向我们新的计算组就可以。因此某个同学或某个业务部门现在需要新加一个业务,这时候仅仅是增加了计算组,相当于配备一条业务规则,操作非常简单。
在这种情况下,客户全部替换成计算组实例,来提供对外的服务。计算组可以手工创建或销毁,计算组可以随意纵向扩缩容,横向增加/减少,满足业务弹性资源需求;业务隔离、任务隔离、同时保持弹性,更serverless,让客户业务性能提升100%-200%。
刚才我们主要讲的是实时数仓,也是当前我们客户使用最成熟的架构。随着流式湖仓需求的兴起,当前技术上主要分为两个发展方向。一方面是数据湖的存储,包括Paimon这样的新的流式数据湖存储,让数据入湖更加简单和高效。然后在查询分析层面,通过类似Presto、Hologres这样高效的查询分析引擎,进行更好的数据查询加速。Hologres可以直接加速读写存储于OSS上的Hudi、Delta、ORC、Parquet、CSV、SequenceFile等格式类型的数据,今年我们和Paimon做了更多的深度合作,可以基于Paimon做流式湖仓的分层建模,降低开发运维成本,打破数据孤岛,实现业务洞察。
回到我们刚才的数仓分层,Hologres、Paimon都具备流式访问能力。如果基于Hologres+Paimon实现流式湖仓的分层,数仓各层都可以根据企业的存储成本、业务时效性进行灵活的调整。
数据存储Paimon,Hologres进行查询加速:提供分钟级时效性+秒级OLAP性能
例如针对一些时效性不敏感的ODS层数据,数据放在Paimon做存储,Hologres进行查询加速,Hologres能提供达到分钟级别时效性,相对来说成本更低,而且在和Paimon结合的加速查询,我们还在不断持续优化提升性能。
数据从Paimon写入Hologres:提供秒级时效性+极致OLAP性能
例如针对一些对与查询性能要求比较高的ADS层,数据可以直接进入Hologres,通过和Flink结合提供秒级时效性和极致的查询性能,查询时间可能为几十毫秒,这样成本相对更高一点,但是性能会快很多。
可以看到无论是和Flink深度集成构建企业级实时数仓,还是与Paimon结合在流式湖仓的探索优化,Hologres在演进过程中始终紧紧围绕实时场景,不断提高实时数仓的性能、可用性、用户体验。Hologres希望通过一站式实时数仓的理念,替换企业纷繁芜杂的数仓架构,让实时数仓更加干净、友好、高效,并帮助企业不断降低成本,加速数字化转型升级。