Flink|《Flink 官方文档 - 部署 - 细粒度资源管理》学习笔记

发布时间:2024年01月20日

学习文档:《Flink 官方文档 - 部署 - 细粒度资源管理》

学习笔记如下:


适用场景

细粒度资源管理的应用场景:

  • Tasks 有显著不同的并行度
  • 整个 pipeline 需要的资源太大了,以致于不能与单一的 Slot/TaskManager 相适应
  • 批处理作业,其中不同 stage 的 tasks 所需的资源差异明显
工作原理

在这里插入图片描述

在执行 task 时,TaskManager 中的资源会被分割成许多个 slots。slot 既是资源调度的基本单元,又是 Flink 运行过程中申请资源的基本单元。

对于非细粒度资源管理,TaskManager 以固定相同的 slots 的个数的方式来满足资源需求。对于细粒度资源管理,slot 资源请求包含用户指定的特定的资源配置信息。Flink 会根据这些用户指定的资源请求,从 TaskManager 的可用资源中动态地切分出精确匹配的 slot。

对于没有指定资源配置的资源请求,Flink 会自动决定资源配置。粗粒度资源管理当前使用 TaskManager 总资源和 TaskManager 的总 slot 数(taskmanager.numberOfTaskSlots)来计算每个 slot 的资源。

例如,上图右侧所示,TaskManager 的总资源是 1Core 和 4GB 内存,task 的 slot 数设置为 2,Slot 2 被创建,并申请 0.5 Core 和 2GB 的内存而没有指定资源配置。在分配 Slot 1和 Slot 2 后,在 TaskManager 留下 0.25 Core 和 1GB 的内存作为未使用资源。

用法

使用细粒度的资源管理,需要指定资源请求。这个资源请求是基于 slot 共享组定义的。一个 slot 共享组是一个切入点,这意味着在 TaskManager 中的算子和 tasks 可以被置于相同的 slot。具体地:

  • 定义 Slot 共享组和它所包含的操作算子,可以调用 slotSharingGroup(String name) 来绑定 slot 共享组
  • 指定 Slot 共享组的资源,在实例化 SlotSharingGroup 时可以设置资源

示例:设置 slot 共享组

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a")
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.build();

SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b")
.setCpuCores(0.5)
.setTaskHeapMemoryMB(100)
.build();

someStream.filter(...).slotSharingGroup("a") // 设置Slot共享组的名字为‘a’ 
.map(...).slotSharingGroup(ssgB); // 直接设置Slot共享组的名字和资源.

env.registerSlotSharingGroup(ssgA); // 注册共享组的资源

在构造 SlotSharingGroup 时,可以为 Slot 共享组设置以下资源:

  • CPU核数:定义作业所需要的 CPU 核数,必须是正值
  • Task堆内存:定义作业所需要的堆内存,必须是正值
  • 堆外内存:定义作业所需要的堆外内存,可设置为 0
  • 管理内存:定义作业所需要的管理内存,可设置为 0
  • 外部资源:定义需要的外部资源,可设置为空

示例:设置一个 slot 共享组的资源

// 通过指定资源直接构建一个 slot 共享组
SlotSharingGroup ssgWithResource =
 SlotSharingGroup.newBuilder("ssg")
     .setCpuCores(1.0) // required
     .setTaskHeapMemoryMB(100) // required
     .setTaskOffHeapMemoryMB(50)
     .setManagedMemory(MemorySize.ofMebiBytes(200))
     .setExternalResource("gpu", 1.0)
     .build();

// 构建一个 slot 共享组未指定资源,然后在 StreamExecutionEnvironment 中注册资源
SlotSharingGroup ssgWithName = SlotSharingGroup.newBuilder("ssg").build();
env.registerSlotSharingGroup(ssgWithResource);

局限

因为细粒度资源管理是新的实验性特性,所以存在如下局限:

  • 不支持弹性伸缩:弹性伸缩目前只支持不指定资源的 slot 请求
  • 与 Flink Web UI 的集成有限:在细粒度的资源管理中,Slots会有不同的资源规格;目前 Web UI 页面只显示 slot 数量而不显示具体详情
  • 与批作业集成有限:目前细粒度资源管理需要在所有边缘都被阻塞的情况下执行批处理工作负载。为了达到该实现,需要将 fine-grained.shuffle-mode.all-blocking 设置为 true,注意这样可能会影响性能。
  • 不建议使用混合资源需求:不建议仅为工作的某些部分指定资源需求,而未指定其余部分的需求。目前,任何资源的插槽都可以满足未指定的要求。它获取的实际资源可能在不同的作业执行或故障切换中不一致。
  • slot分配结果可能不是最优:正因为 slot 需求包含资源的多维度方面,所以 slot 分配实际上是一个多维度问题,这是一个 NP 难题;因些,在一些使用场景中,默认的资源分配策略可能不会使得 slot 分配达到最优,而且还会导致资源碎片或者资源分配失败。

注意

  • 设置 Slot 共享组可能改变性能:为可链式操作的算子设置不同的 slot 共享组可能会导致链式操作 operator chains 产生割裂,从而改变性能
  • slot 共享组不会限制算子的调度:slot 共享组仅仅意味着调度器可以使被分组的算子被部署到同一个 slot 中,但无法保证将被分组的算子部署在一起。如果被分组算子被部署到单独的 slot 中,slot 资源将从特定的资源组需求中派生而来。

深入讨论

在使用粗粒度的资源管理时,就相当于简单地把所有 tasks 放入一个 slot 共享组中运行:

  • 对于许多有相同并行度的 tasks 的流作业而言,每个 slot 会包含整个 pipeline;此时,在理想情况条件下,所有的 pipeline 应该使用大致相同的资源
  • tasks 的资源消耗随时间变化不同,当一个 task 的资源消耗减少,多余的资源可以被另外一个 task 使用,这就是 “调峰填谷效应” 的现象,它降低了所需要的总体需求

综上所述,在拥有相同并行度、且整个 pipeline 的资源可以被单个 slot / TaskManager 适应的场景下,这样的资源利用率是很高的。

但是:

  • 当 tasks 有不同的并行度时,像 source / sink / lookup 这些类型的 task 的并行度可能被分区数和上下游系统的 IO 负载所限制,此时拥有更少 tasks 的 slot 会需要比拥有整个 pipeline 的 slot 更少的资源。
  • 当整个 pipeline 所需的资源过多而单个 slot / TaskManager 无法适应时,整个 pipeline 需要被分割成多个 SSG,它们可能不总是有相同的资源需求
  • 对于批处理作业,不是所有的 tasks 都能够被同时执行,因此,整个 pipeline 的瞬时资源需求随时间变化。

在以上情况下,使用相同的 slot 执行所有的 task 会造成非最优的资源利用率。因为相同 slot 的资源必须能够满足最高的资源需求,但对于其他资源需求来说是浪费的。在这种情况下,细粒度的资源管理运用不同资源的 slot 提高了资源利用率。

资源分配策略

实例:Flink 执行细粒度资源管理的过程

在这里插入图片描述

TaskManager 被以总资源的形式启动,但没有提前指定 slots。当一个 slot 请求 0.25 Core 和 1GB 的内存,Flink 将会选择一个有足够可用资源的 TaskManger 和创建一个新的已经被资源申请的 slot。如果一个 slot 未被使用,它会将它的资源返回到 TaskManager 中的可用资源中去。

在当前的资源分配策略中,Flink 会遍历所有被注册的 TaskManagers 并选择第一个有充足资源的 TaskManager 来满足 slot 的资源请求。当没有 TaskManager 有足够的资源时,Flink 将会从 Kubernetes 或 YARN 分配一个新的 TaskManager。在当前的策略中,Flink 会根据用户配置分配相同的 Taskmanagers。

因为 TaskManager 的规格组合是预定义的,所以:

  • 集群中可能会存在资源碎片
  • 确保配置的 slot 共享组的资源组成不能大于 TaskManager 的总资源;否则,job 会失败,并抛出异常
文章来源:https://blog.csdn.net/Changxing_J/article/details/135686465
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。