学习文档:《Flink 官方文档 - 部署 - 细粒度资源管理》
学习笔记如下:
细粒度资源管理的应用场景:
在执行 task 时,TaskManager 中的资源会被分割成许多个 slots。slot 既是资源调度的基本单元,又是 Flink 运行过程中申请资源的基本单元。
对于非细粒度资源管理,TaskManager 以固定相同的 slots 的个数的方式来满足资源需求。对于细粒度资源管理,slot 资源请求包含用户指定的特定的资源配置信息。Flink 会根据这些用户指定的资源请求,从 TaskManager 的可用资源中动态地切分出精确匹配的 slot。
对于没有指定资源配置的资源请求,Flink 会自动决定资源配置。粗粒度资源管理当前使用 TaskManager 总资源和 TaskManager 的总 slot 数(taskmanager.numberOfTaskSlots
)来计算每个 slot 的资源。
例如,上图右侧所示,TaskManager 的总资源是 1Core 和 4GB 内存,task 的 slot 数设置为 2,Slot 2 被创建,并申请 0.5 Core 和 2GB 的内存而没有指定资源配置。在分配 Slot 1和 Slot 2 后,在 TaskManager 留下 0.25 Core 和 1GB 的内存作为未使用资源。
使用细粒度的资源管理,需要指定资源请求。这个资源请求是基于 slot 共享组定义的。一个 slot 共享组是一个切入点,这意味着在 TaskManager 中的算子和 tasks 可以被置于相同的 slot。具体地:
slotSharingGroup(String name)
来绑定 slot 共享组SlotSharingGroup
时可以设置资源示例:设置 slot 共享组
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a") .setCpuCores(1.0) .setTaskHeapMemoryMB(100) .build(); SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b") .setCpuCores(0.5) .setTaskHeapMemoryMB(100) .build(); someStream.filter(...).slotSharingGroup("a") // 设置Slot共享组的名字为‘a’ .map(...).slotSharingGroup(ssgB); // 直接设置Slot共享组的名字和资源. env.registerSlotSharingGroup(ssgA); // 注册共享组的资源
在构造 SlotSharingGroup 时,可以为 Slot 共享组设置以下资源:
示例:设置一个 slot 共享组的资源
// 通过指定资源直接构建一个 slot 共享组 SlotSharingGroup ssgWithResource = SlotSharingGroup.newBuilder("ssg") .setCpuCores(1.0) // required .setTaskHeapMemoryMB(100) // required .setTaskOffHeapMemoryMB(50) .setManagedMemory(MemorySize.ofMebiBytes(200)) .setExternalResource("gpu", 1.0) .build(); // 构建一个 slot 共享组未指定资源,然后在 StreamExecutionEnvironment 中注册资源 SlotSharingGroup ssgWithName = SlotSharingGroup.newBuilder("ssg").build(); env.registerSlotSharingGroup(ssgWithResource);
因为细粒度资源管理是新的实验性特性,所以存在如下局限:
fine-grained.shuffle-mode.all-blocking
设置为 true,注意这样可能会影响性能。在使用粗粒度的资源管理时,就相当于简单地把所有 tasks 放入一个 slot 共享组中运行:
综上所述,在拥有相同并行度、且整个 pipeline 的资源可以被单个 slot / TaskManager 适应的场景下,这样的资源利用率是很高的。
但是:
在以上情况下,使用相同的 slot 执行所有的 task 会造成非最优的资源利用率。因为相同 slot 的资源必须能够满足最高的资源需求,但对于其他资源需求来说是浪费的。在这种情况下,细粒度的资源管理运用不同资源的 slot 提高了资源利用率。
实例:Flink 执行细粒度资源管理的过程
TaskManager 被以总资源的形式启动,但没有提前指定 slots。当一个 slot 请求 0.25 Core 和 1GB 的内存,Flink 将会选择一个有足够可用资源的 TaskManger 和创建一个新的已经被资源申请的 slot。如果一个 slot 未被使用,它会将它的资源返回到 TaskManager 中的可用资源中去。
在当前的资源分配策略中,Flink 会遍历所有被注册的 TaskManagers 并选择第一个有充足资源的 TaskManager 来满足 slot 的资源请求。当没有 TaskManager 有足够的资源时,Flink 将会从 Kubernetes 或 YARN 分配一个新的 TaskManager。在当前的策略中,Flink 会根据用户配置分配相同的 Taskmanagers。
因为 TaskManager 的规格组合是预定义的,所以: