核心入口:
JobMaster.onStart();
部署 Task 链条:JobMaster --> DefaultScheduler --> SchedulingStrategy --> ExecutionVertex --> Execution --> RPC请求 --> TaskExecutor
JobMaster 向 TaskExecutor 发送 submitTask() 的 RPC 请求,用来部署 StreamTask 运行。TaskExecutor 接收到 JobMaster 的部署 Task 运行的 RPC 请求的时候,就封装了一个 Task 抽象,然后通过一个线程来启动这个 Task。
在 Task 的构造方法中,也做了一些相应的初始化动作,大致总结一下;
所有 channel 共用同一个 netty 客户端,通过 inputChannelId 区分消息是属于哪个 inputChannel。同理,所有 ResultSubPartition 共用同一个 netty 服务端,通过 channelIndex 区分消息是属于哪个 ResultSubPartition。
Task 的启动,是通过启动 Task 对象的内部 executingThread 来执行 Task 的,具体逻辑在 run 方法中。当 invokable.invoke(); 执行的时候,Task 就真正执行起来了。根据上述代码的执行可知:一个 Task 的状态周期:
CREATED ----> DEPLOYING ----> INITIALIZING ----> RUNNING ----> FINISHED
这个地方的初始化,指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建。
Task 这个类,只是一个笼统意义上的 Task,就是一个通用 Task 的抽象,不管是批处理的,还是流式处理的,不管是源 Task, 还是逻辑处理 Task, 都被抽象成 Task 来进行调度执行。
在 SourceStreamTask 的 processInput() 方法中,主要是启动接收数据的线程 LegacySourceFunctionThread。
在 Flink-1.9 之前,StreamTask 中的多线程互斥通过一个 CheckpointLock 来解决。Flink 从 1.9 版本之后引入基于 Actor 模型的 Mailbox 设计理念来取代 StreamTask 中现有的多线程模型,变为了单线程(MailboxProcessor) + 阻塞队列(Mailbox) 的形式。
SourceStreamTask 与 OneInputStreamTask/TwoInputStreamTask 的不同之处在于 mainOperator 中的 userFunction。
SourceStreamTask 的 processInput() 内部 通过 LegacySourceFunctionThread 来对接数据源,不停的获取一条一条的数据,通过 output 组件交给后面。
OneInputStreamTask 或者 TwoInputStreamTask 是通过 StreamInputProcessor 来获取输入数据,然后执行处理。
RecordWriter 的具体实现是什么呢:
关于 OneInputStreamTask 获取 Buffer 数据的时候,其实涉及到两部分的逻辑:
当 StreamTask 接收到某个 InputChannel 发送过来的数据的时候,就会把这个 InputChannel 和 Buffer 数据加入到 inputChannelsWithData 队列中,然后环境 pollNextBuffer 的执行逻辑,就能获取到 Buffer,执行接下来的 processElement 方法了。
Flink Task 的 InputChannel 数据处理就是典型的生产者消费者模式(wait + notifyAll)
SourceStreamTask 和 OneInputStreamTask 在接收到数据执行处理之后,都通过一个 output 执行向下输出。