作者:米哈游大数据开发
近年来,容器、微服务、Kubernetes 等各项云原生技术的日渐成熟,越来越多的公司开始选择拥抱云原生,并开始将 AI、大数据等类型的企业应用部署运行在云原生之上。以 Spark 为例,在云上运行 Spark 可以充分享有公共云的弹性资源、运维管控和存储服务等,并且业界也涌现了不少 Spark on Kubernetes 的优秀实践。
在刚刚结束的 2023 云栖大会上,米哈游数据平台组大数据技术专家杜安明分享了米哈游大数据架构向云原生化升级过程中的目标、探索和实践,以及如何通过以阿里云容器服务 ACK 为底座的 Spark on K8s 架构,获得在弹性计算、成本节约以及存算分离方面的价值。
随着米哈游业务的高速发展,大数据离线数据存储量和计算任务量增长迅速,早期的大数据离线架构已不再满足新场景和需求。
为了解决原有架构缺乏弹性、运维复杂、资源利用率低等问题,2022 年下半年,我们着手调研将大数据基础架构云原生化,并最终在阿里云上落地了Spark on K8s + OSS-HDFS 方案,目前在生产环境上已稳定运行了一年左右的时间,并获得了弹性计算、成本节约以及存算分离这三大收益。
1. 弹性计算
由于游戏业务会进行周期版本更新、开启活动以及新游戏的上线等,对离线计算资源的需求与消耗波动巨大,可能是平时水位的几十上百倍。利用 K8s 集群天然的弹性能力,将 Spark 计算任务调度到 K8s 上运行,可以比较轻松的解决这类场景下资源消耗洪峰问题。
2. 成本节约
依托阿里云容器服务 Kubernetes 版 ACK 集群自身强大的弹性能力,所有计算资源按量申请、用完释放,再加上我们对 Spark 组件的定制改造,以及充分利用 ECI Spot 实例,在承载同等计算任务和资源消耗下,成本节约达 50%。
3. 存算分离
Spark 运行在 K8s 之上,完全使用 K8s 集群的计算资源,而访问的则数据也由 HDFS、OSS 逐步切换到 OSS-HDFS 上,中间 Shuffle 数据的读写采用 Celeborn,整套架构实现了计算和存储的解耦,易于维护和扩展。
众所周知,Spark 引擎可以支持并运行在多种资源管理器之上,比如 Yarn、K8s、Mesos 等。在大数据场景下,目前国内大多公司的 Spark 任务还是运行在 Yarn 集群之上的,Spark 在 2.3 版本首次支持 K8s,并于 2021 年 3 月发布的 Spark3.1 版本才正式 GA。
相较于 Yarn,Spark 在 K8s 上起步较晚,尽管在成熟度、稳定性等方面还存在一定的欠缺,但是 Spark on K8s 能够实现弹性计算以及成本节约等非常突出的收益,所以各大公司也都在不断进行尝试和探索,在此过程中,Spark on K8s 的运行架构也在不断的向前迭代演进。
目前,将 Spark 任务运行在 K8s 上,大多公司采用的方案依旧是在线与离线混合部署的方式。架构设计依据的原理是,不同的业务系统会有不同的业务高峰时间。大数据离线业务系统典型任务高峰期间会是凌晨的?0?点到 9 点钟,而像是各种应用微服务、Web 提供的 BI 系统等,常见的业务高峰期是白天时间,在这个时间以外的其它时间中,可以将业务系统的机器 Node 加入到 Spark 所使用的 K8s NameSpace ?中。如下图所示,将 Spark 与其他在线应用服务等都部署在一套 K8s 集群之上。
该架构的优点是可以通过在离线业务的混合部署和错峰运行,来提升机器资源利用率并降低成本,但是缺点也比较明显,即架构实施起来复杂,维护成本比较高,而且难以做到严格的资源隔离,尤其是网络层面的隔离,业务之间不可避免的会产生一定的相互影响,此外,我们认为该方式也不符合云原生的理念和未来发展趋势。
考虑到在离线混合部署的弊端,我们设计采用了一种新的、也更加符合云原生的实现架构:底层存储采用 OSS-HDFS(JindoFs),计算集群采用阿里云的容器服务 ACK,Spark 选择功能相对丰富且比较稳定的 3.2.3 版本。
OSS-HDFS 完全兼容了 HDFS 协议,除了具备 OSS 无限容量、支持数据冷热存储等优点以外,还支持了目录原子性、毫秒级 rename 操作,非常适用于离线数仓,可以很好的平替现有 HDFS 和 OSS。
阿里云 ACK 集群提供了高性能、可伸缩的容器应用管理服务,可以支持企业级 Kubernetes 容器化应用的生命周期管理,ECS 是大家所熟知的阿里云服务器,而弹性容器实例 ECI 是一种 Serverless 容器运行服务,可以按量秒级申请与释放。
该架构简单易维护,底层利用 ECI 的弹性能力,Spark 任务可以较为轻松的应对高峰流量,将 Spark 的 Executor 调度在 ECI 节点上运行,可最大程度的实现计算任务弹性与最佳的降本效果,整体架构的示意图如下所示。
在阐述具体实现之前,先简要介绍一下 Spark 在 K8s 上运行的基本原理。Pod 在 K8s 中是最小的调度单元,Spark 任务的 Driver 和 Executor 都是一个单独 Pod,每个 Pod 都分配了唯一的 IP 地址,Pod 可以包含一个或多个 Container,无论是 Driver 还是 Executor 的 JVM 进程,都是在 Container 中进行启动、运行与销毁的。
一个 Spark 任务被提交到 K8s 集群之后,首先启动的是 Driver Pod,而后 Driver 会向 Apiserver 按需申请 Executor,并由 Executor 去执行具体的 Task,作业完成之后由 Driver 负责清理所有的 Executor Pod,以下是这几者关系的简要示意图。
下图展示了完整的作业执行流程,用户在完成 Spark 作业开发后,会将任务发布到调度系统上并进行相关运行参数的配置,调度系统定时将任务提交到自研的 Launcher 中间件,并由中间件来调用 spark-k8s-cli,最终由 Cli 将任务提交至 K8s 集群上。任务提交成功之后,Spark Driver Pod 最先启动,并向集群申请分配 Executor Pod,Executor 在运行具体的 Task 时,会与外部 Hive、Iceberg、OLAP 数据库、OSS-HDFS 等诸多大数据组件进行数据的访问与交互,而 Spark Executor 之间的数据 Shuffle 则由 CeleBorn 来实现。
关于如何将 Spark 任务提交到 K8s 集群上,各个公司的做法不尽相同,下面先简要描述下目前比较常规的做法,然后再介绍目前我们线上所使用的任务提交和管理方式。
通过 spark-submit 命令直接提交,Spark 原生就支持这种方式,集成起来比较简单,也符合用户的习惯,但是不方便进行作业状态跟踪和管理,无法自动配置 Spark UI 的 Service 和 Ingress,任务结束后也无法自动清理资源等,在生产环境中并不适合。
这是目前较常用的一种提交作业方式,K8s 集群需要事先安装 spark-operator,客户端通过 kubectl 提交 yaml 文件来运行 Spark 作业。本质上这是对原生方式的扩展,最终提交作业依然是使用 spark-submit 方式,扩展的功能包括:作业管理,Service/Ingress 创建与清理,任务监控,Pod 增强等。此种方式可在生产环境中使用,但与大数据调度平台集成性不太好,对于不熟悉 K8s 的用户来说,使用起来复杂度和上手门槛相对较高。
在生产环境上,我们采用 spark-k8s-cli 的方式进行任务的提交。spark-k8s-cli 本质上是一个可执行的文件,基于阿里云 emr-spark-ack 提交工具我们进行了重构、功能增强和深度的定制。
spark-k8s-cli 融合 spark-submit 和 spark-operator 两种作业提交方式的优点,使得所有作业都能通过 spark-operator 管理,支持运行交互式 spark-shell 和本地依赖的提交,并且在使用方式上与原生 spark-submit 语法完全一致。
在上线使用初期,我们所有任务的 Spark Submit JVM 进程都启动在 Gateway Pod 中,在使用一段时间后,发现该方式稳定性不足,一旦 Gateway Pod 异常,其上的所有正在 Spark 任务都将失败,另外 Spark 任务的日志输出也不好管理。鉴于此种情况,我们将 spark-k8s-cli 改成了每个任务使用单独一个 Submit Pod 的方式,由 Submit Pod 来申请启动任务的 Driver,Submit Pod 和 Driver Pod 一样都运行在固定的 ECS 节点之上,Submit Pod 之间完全独立,任务结束后 Submit Pod 也会自动释放。spark-k8s-cli 的提交和运行原理如下图所示。
关于 spark-k8s-cli,除了上述基本的任务提交以外,我们还做了其他一些增强和定制化的功能。
K8s 集群本身并没有像 Yarn 那样提供日志自动聚合和展示的功能,Driver 和 Executor 的日志收集需要用户自己来完成。目前比较常见的方案是在各个 K8s Node上部署 Agent,通过 Agent 把日志采集并落在第三方存储上,比如 ES、SLS 等,但这些方式对于习惯了在 Yarn 页面上点击查看日志的用户和开发者来说,使用起来很不方便,用户不得不跳转到第三方系统上捞取查看日志。
为实现 K8s Spark 任务日志的便捷查看,我们对 Spark 代码进行了改造,使 Driver 和 Executor 日志最终都输出到 OSS 上,用户可以在 Spark UI 和 Spark Jobhistory 上,直接点击查看日志文件。
上图所示为日志的收集和展示原理,Spark 任务在启动时,Driver 和 Executor 都会首先注册一个 Shutdown Hook,当任务结束 JVM 退出时,调用 Hook 方法把完整的日志上传到 OSS 上。此外,想要完整查看日志,还需要对 Spark 的 Job History 相关代码做下改造,需要在 History 页面显示 stdout 和 stderr,并在点击日志时,从 OSS 上拉取对应 Driver 或 Executor 的日志文件,最终由浏览器渲染查看。另外,对于正在运行中的任务,我们会提供一个 Spark Running Web UI 给用户,任务提交成功后,spark-operator?会自动生成的 Service 和 Ingress 供用户查看运行详情,此时日志的获取通过访问 K8s 的 api 拉取对应 Pod 的运行日志即可。
基于 ACK 集群提供的弹性伸缩能力,再加上对 ECI 的充分利用,同等规模量级下的Spark 任务,运行在 K8s 的总成本要明显低于在 Yarn 固定集群上,同时也大大提高了资源利用率。
弹性容器实例 ECI 是一种 Serverless 容器运行服务,ECI 和 ECS 最大的不同就在于 ECI 是按量秒级计费的,申请与释放速度也是秒级的,所以 ECI 很适合 Spark 这一类负载峰谷明显的计算场景。
上图示意了 Spark 任务在 ACK 集群上如何申请和使用 ECI,使用前提是在集群中安装 ack-virtual-node 组件,并配置好 Vswitch 等信息,在任务运行时,Executor 被调度到虚拟节点上,并由虚拟节点申请创建和管理 ECI。
ECI 分为普通实例和抢占式实例,抢占式实例是一种低成本竞价型实例,默认有 1 小时的保护期,适用于大部分 Spark 批处理场景,超出保护期后,抢占式实例可能被强制回收。为进一步提升降本效果,充分利用抢占式实例的价格优势,我们对 Spark 进行改造,实现了 ECI 实例类型自动转换的功能。Spark 任务的 Executor Pod 都优先运行在抢占式 ECI 实例上,当发生库存不足或其他原因无法申请创建抢占式实例,则自动切换为使用普通 ECI 实例,保证任务的正常运行。?具体实现原理和转换逻辑如下图所示。
由于 K8s 节点的磁盘容量很小,而且节点都是用时申请、用完释放的,无法保存大量的 Spark Shuffle 数据。如果对 Executor Pod 挂载云盘,挂载盘的大小难以确定,考虑到数据倾斜等因素,磁盘的使用率也会比较低,使用起来比较复杂。此外,虽然 Spark 社区在 3.2 提供了 Reuse PVC 等功能,但是调研下来觉得功能尚不完备且稳定性不足。
为解决 Spark 在 K8s 上数据 Shuffle 的问题,在充分调研和对比多家开源产品后,最终采用了阿里开源的 Celeborn 方案。Celeborn 是一个独立的服务,专门用于保存 Spark 的中间 Shuffle 数据,让 Executor 不再依赖本地盘,该服务 K8s 和 Yarn 均可以使用。Celeborn 采用了 Push Shuffle 的模式,Shuffle 过程为追加写、顺序读,提升数据读写性能和效率。
基于开源的 Celeborn 项目,我们内部也做了一些数据网络传输方面的功能增强、Metrics 丰富、监控告警完善、Bug 修复等工作,目前已形成了内部稳定版本。
Kyuubi 是一个分布式和多租户的网关,可以为 Spark、Flink 或 Trino 等提供 SQL 等查询服务。在早期,我们的 Spark Adhoc 查询是发送到 Kyuubi 上执行的。为了解决 Yarn 队列资源不足,用户的查询 SQL 无法提交和运行的问题,在 K8s 上我们也支持了 Kyuubi Server 的部署运行,当Yarn资源不足时,Spark 查询自动切换到 K8s 上运行。鉴于 Yarn 集群规模逐渐缩减,查询资源无法保证,以及保障相同的用户查询体验,目前我们已将所有的 SparkSQL Adhoc 查询提交到 K8s 上执行。
为了让用户的 Adhoc 查询也能在 K8s 上畅快运行,我们对 Kyuubi 也做了一些源码改造,包括对 Kyuubi 项目中 docker-image-tool.sh、Deployment.yaml、Dockfile 文件的改写,重定向 Log 到 OSS 上,Spark Operator 管理支持、权限控制、便捷查看任务运行 UI 等。
在 Spark on K8s 场景下,尽管 K8s 有集群层面的监控告警,但是还不能完全满足我们的需求。在生产环境中,我们更加关注的是在集群上的 Spark 任务、Pod 状态、资源消耗以及 ECI 等运行情况。利用 K8s 的 Watch 机制,我们实现了自己的监控告警服务 K8s Manager,下图所示为该服务的示意图。
K8sManager 是内部实现的一个比较轻量的 Spring Boot 服务,实现的功能就是对各个 K8s 集群上的 Pod、Quota、Service、ConfigMap、Ingress、Role 等各类资源信息监听和汇总处理,从而生成自定义的 Metrics 指标,并对指标进行展示和异常告警,其中包括集群 CPU 与 Memory 总使用量、当前运行的 Spark 任务数、Spark 任务内存资源消耗与运行时长 Top 统计、单日 Spark 任务量汇总、集群 Pod 总数、Pod 状态统计、ECI 机器型号与可用区分布统计、过期资源监控等等,这里就不一一列举了。
在我们的调度系统中,Spark 任务支持配置 Yarn、K8s、Auto 三种执行策略。如果用户任务指明了需要运行使用的资源管理器,则任务只会在 Yarn 或 K8s 上运行,若用户选择了 Auto,则任务具体在哪里执行,取决于当前 Yarn 队列的资源使用率,如下图所示。由于总任务量较大,且 Hive 任务也在不断迁移至 Spark,目前仍然有部分任务运行在 Yarn 集群上,但最终的形态所有任务将由 K8s 来托管。
Spark 任务运行过程中大量使用 ECI,ECI 创建成功有两个前提条件: 1、能够申请到 IP 地址;2、当前可用区有库存。?实际上,单个交换机提供的可用IP数量有限,单个可用区拥有的抢占式实例的总个数也是有限的,因此在实际生产环境中,无论是使用普通 ECI 还是 Spot 类型的 ECI,比较好的实践方式是配置支持多可用区、多交换机。
由于在 Spark 任务提交时,都已明确指定了每个 Executor 的 Cpu、Memory 等型号信息,在任务结束 SparkContxt 关闭之前,我们可以从任务的中拿到每个 Executor 的实际运行时长,再结合单价,即可计算出 Spark 任务的大致花费。由于 ECI Spot 实例是随着市场和库存量随时变动的,该方式计算出来的单任务成本是一个上限值,主要用于反映趋势。
在上线初期任务量较少时,Spark Operator 服务运行良好,但随着任务不断增多,Operator 处理各类Event事件的速度越来越慢,甚至集群出现大量的 ConfigMap、Ingress、Service 等任务运行过程中产生的资源无法及时清理导致堆积的情况,新提交 Spark 任务的 Web UI 也无法打开访问。发现问题后,我们调整了 Operator 的协程数量,并实现对 Pod Event 的批量处理、无关事件的过滤、TTL 删除等功能,解决了 Spark Operator 性能不足的问题。
Spark3.2.2 采用 fabric8(Kubernetes Java Client)来访问和操作 K8s 集群中的资源,默认客户端版本为 5.4.1,在此版本中,当任务结束 Executor 集中释放时,Driver 会大量发送 Delete Pod 的 Api 请求到 K8s Apiserver 上,对集群 Apiserver 和 ETCD 造成较大的压力,Apiserver 的 cpu 会瞬间飙高。
目前我们的内部 Spark 版本,已将 kubernetes-client 升级到 6.2.0,支持 pod 的批量删除,解决 Spark 任务集中释放时,由大量的删除 Api 请求操作的集群抖动。
在整个 Spark on K8s 的方案设计以及实施过程中,我们也遇到了各种各样的问题、瓶颈和挑战,这里做下简单的介绍,并给出我们的解决方案。
弹性网卡释放速度慢的问题,属于 ECI 大规模应用场景下的性能瓶颈,该问题会导致交换机上 IP 的剧烈消耗,最终导致 Spark 任务卡住或提交失败,具体触发原因如下图所示。目前阿里云团队已通过技术升级改造解决,并大幅提升了释放速度和整体性能。
Spark 任务在启动 Driver 时,会创建对 Executor 的事件监听器,用于实时获取所有 Executor 的运行状态,对于一些长时运行的 Spark 任务,这个监听器往往会由于资源过期、网络异常等情况而失效,因此在此情况下,需要对 Watcher 进行重置,否则任务可能会跑飞。该问题属于 Spark 的一个 Bug,当前我们内部版本已修复,并将 PR 提供到了 Spark 社区。
如上图所示,Driver 通过 List 和 Watch 两种方式来获取 Executor 的运行状况。Watch 采用被动监听机制,但是由于网络等问题可能会发生事件漏接收或漏处理,但这种概率比较低。List 采用主动请求的方式,比如每隔 3 分钟,Driver 可向 Apiserver 请求一次自己任务当前全量 Executor 的信息。
由于 List 请求任务所有 Pod 信息,当任务较多时,频繁 List 对 K8s 的 Apiserver 和 ETCD 造成较大压力,早期我们关闭了定时 List,只使用 Watch。当 Spark 任务运行异常,比如有很多 Executor OOM 了,有一定概率会导致 Driver Watch 的信息错误,尽管 Task 还没有运行完,但是 Driver 却不再申请 Executor 去执行任务,发生任务卡死。对此我们的解决方案如下:
ApacheCeleborn 是阿里开源的一款产品,前身为 RSS(Remote Shuffle Service)。在早期成熟度上还略有欠缺,在对网络延迟、丢包异常处理等方面处理的不够完善,导致线上出现一些有大量 Shuffle 数据的 Spark 任务运行时间很长、甚至任务失败,以下三点是我们针对此问题的解决办法。
为了防止资源被无限使用,我们对每个 K8s 集群都设置了 Quota 上限。在 K8s 中,Quota 也是一种资源,每一个 Pod 的申请与释放都会修改 Quota 的内容(Cpu/Memory 值),当很多任务并发提交时,可能会发生 Quota 锁冲突,从而影响任务 Driver 的创建,任务启动失败。
应对这种情况导致的任务启动失败,我们修改 Spark Driver Pod 的创建逻辑,增加可配置的重试参数,当检测到 Driver Pod 创建是由于 Quota 锁冲突引起时,进行重试创建。Executor Pod 的创建也可能会由于 Quota 锁冲突而失败,这种情况可以不用处理,Executor 创建失败 Driver 会自动申请创建新的,相当于是自动重试了。
当瞬时批量提交大量任务到集群时,多个 Submit Pod 会同时启动,并向 Terway 组件申请 IP 同时绑定弹性网卡,存在一定概率出现以下情况,即 Pod 已经启动了,弹性网卡也绑定成功但是实际并没有完全就绪,此时该 Pod 的网络通信功能实际还无法正常使用,任务访问 Core DNS 时,请求无法发出去,Spark 任务报错 UnknownHost 并运行失败。该问题我们通过下面这两个措施进行规避和解决:
为保障库存的充足,各 K8s 集群都配置了多可用区,但跨可用区的网络通信要比同可用区之间通信的稳定性略差,即可用区之间就存在一定概率的丢包,表现为任务运行时长不稳定。
对于跨可用区存在网络丢包的现象,可尝试将 ECI 的调度策略设定为 VSwitchOrdered,这样一个任务的所有 Executor 基本都在一个可用区,避免了不同可以区 Executor 之间的通信异常,导致的任务运行时间不稳定的问题。
最后,非常感谢阿里云容器、ECI、EMR 等相关团队的同学,在我们整个技术方案的落地与实际迁移过程中,给予了非常多的宝贵建议和专业的技术支持。
目前新的云原生架构已在生产环境上稳定运行了近一年左右的时间,在未来,我们将持续对整体架构进行优化和提升,主要围绕以下几个方面:
持续优化云原生的整体方案,进一步提升系统承载与容灾能力
云原生架构升级,更多大数据组件容器化,让整体架构更加彻底的云原生化
更加细粒度的资源管理和精准的成本控制