flink是一个开发框架,用于进行数据批处理,本文主要探讨Flink任务运行的的架构。由于在日常生产环境中,常用的是flink on yarn 和flink on k8s两种类型的模式,因此本文也主要探讨这两种类型的异同,以及不同角色的容错机制。
JM是一个独立的JVM进程,在HA场景下一个App能够同时启动多个JM,通常通过ZK进行选主,只有一个JM是active状态,其他的JM处于standby状态。
具有许多与协调 Flink 应用程序的分布式执行有关的职责:
JM在flink任务中起到总体的协调作用。
TM是一个独立的JVM进程,负责具体的任务执行。TM在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JM 处接收需要部署的 Task,部署启动后,与自己的上游建立连接,接收数据并处理。
是集群的工作节点,每个 TM 最少持有 1 个 Slot,Slot 是 Flink 执行 Job 时的最小资源分配单位,在 Slot 中运行着具体的 Task 任务。
Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
一个任务会对应一个Job,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
在 Flink-1.11 版本之前 Flink on yarn 有两种部署的模式, session 模式和 per-job 模式,但是这两种模式都存在一定的问题,所以在最新的 Flink-1.11 版本中引入了新的部署模式即 application 模式,支持 yarn 和 k8s。
1, 原本需要客户端做的三件事被转移到了JobManager里,也就是说main()方法在集群中执行(入口点位于ApplicationClusterEntryPoint),Deployer只需要负责发起部署请求了。
2,如果一个main()方法中有多个env.execute()/executeAsync()调用,在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)
JM和AM是两个完全不同的东西,JM是控制Flink计算和任务资源的,而AM是控制yarn app运行和任务资源的。在Flink On Yarn模式中,JM运行在AM上,JM会和AM通信,资源的申请由AppMaster来完成,而任务的调度和执行则由JM完成,JM会通过与AppMaster通信来让TM的执行具体的任务。
任务提交流程图
执行过程
YARN-Cluster的执行,需要安装flink 客户端,并执行如下命令提交任务
bin/flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
Flink 2.3开始,Flink官方就开始支持Kubernetes作为新的资源调度模式。
总体提交流程如下
可以通过flink原生提交方式和 flink-on-k8s-operator提交 两种方式进行提交,两种方式实现上有些差异,但是总体流程是一致的。
1, flink原生提交方式
需要安装flink 客户端,并执行如下命令提交任务
./bin/flink run -m 192.168.244.131:8081 ./examples/batch/WordCount.jar --input aaa.txt --output bbb.txt/
2, flink-on-k8s-operator提交
flink-on-k8s-operator[2],可以让用户以CRD(CustomResourceDefinition) [4] 的方式提交和管理Flink作业。这种方式能够更好的利用k8s原生的能力,具备更好的扩展性。并且在此之上增加了定时任务、重试、监控等一系列功能。具体的功能特性可以在github查看官方文档(kubernetes官方推荐)
需要
1, 需要提前在k8s集群中安装,此时会启动一个名为flinkoperator的pod
2,定义提交flink任务的相关CRD资源
3,提交作业时,无需准备一个具备Flink环境的Client,直接通过kubectl或者kubernetes api就可以提交Flink作业。
列入一个crd,命名flink.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment # Flink集群在K8s的资源类型
metadata:
name: basic-example # 作业的名字
namespace: flink # 指定在flink命名空间下运行
spec:
image: flink:1.13.6 # Flink的镜像,改为使用1.13.6
# 如果官方Flink镜像下载不了,可以使用此镜像
# image: registry.cn-hangzhou.aliyuncs.com/cm_ns01/flink:1.13.6
flinkVersion: v1_13 # Flink的版本,与镜像版本保持一致
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar # Flink作业的启动类所在的Jar包路径
parallelism: 2
upgradeMode: stateless
执行如下命令即可启动相关的pod,并进行提交任务
kubectl apply -f flink.yaml
同时部署多个JM,基于ZK进行选主,一个Job只有一个JM是active,其余是standby,如果active异常,standby进行竞争选主,进行HA容灾。
TM异常是日常生产环境中最常遇到的现象,造成的原因很多,最常见的是由于机器故障,从而导致就上运行的TM异常。
TM异常退出时,JM没有在规定时间内收到执行器的状态更新,于是JM会将注册的TM移除,并通过调度器自动重新拉起TM。新启动的TM会重新注册到JM中,JM会根据DAG给TM重新分配相关的Task。TM分配到到来自JM的Task,需要重checkpoint重新加载数据并继续执行计算。Flink运算数据行程DAG,如果遇到不同的TM之间有数据交互时(比如TMA的数据聚合依赖于TMB和TMC,TMB宕机,TMA的数据聚合也不准确),不能简单的通过启动对应的TM相关的数据进行恢复(可能会有数据紊乱),通常恢复的时间较久。
TM之间由于有复杂的数据交互,难以通过DAG重新计算局部TM任务资源恢复的成本很高,高于使用checkpoint进行任务恢复的成本,因此通常的做法是基于check进行任务重启。
可以,基于ZK进行选主。