对于运行失败的 Task,TaskSetManager 会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中等待重新执行,当重试次数过允许的最大次数,整个 Application失败。在记录 Task 失败次数过程中,TaskSetManager 还会记录它上一次失败所在的 ExecutorId 和 Host,这样下次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。
1、主要作用:维护 waiting jobs 和 active jobs 两个队列,维护 waiting stages、active stages 和 failed stages,以及与 jobs 的映射关系。
2、工作机制:DAGScheduler 拿到一个 JOB, 会切分成多个 Stage,从 job 的后面往前寻找 shuffle 算子。如果找到一个 shuffle 算子,就切开,已经找到的 RDD 的执行链就自成一个 Stage,放入到一个栈中。将来 DAGScheduler 要把这个栈中的每个 stage 拿出来,提交给 TaskScheduler。
3、核心描述
TaskScheduler:DAGScheduler 把 Stage 变成 TaskSet,然后交由 TaskScheduler 执行任务分发
1、TaskScheduler 本身是个接口,Spark 里只实现了一个 TaskSchedulerImpl,理论上任务调度可以定制。
2、维护 task 和 executor 对应关系,executor 和物理资源对应关系,在排队的 task 和正在跑的 task。
3、维护内部一个任务队列,根据 FIFO 或 Fair 策略,调度任务。
4、TaskScheduler 有两个重要的成员变量:
(1)DAGScheduler:负责 job 中的 stage 切分,
(2)SchedulerBackend:执行 Task 的分发
无所不包容的一个容器,spark App 在运行过程中的,各种信息都存储在 SparkContext 中。SparkContext / SparkSession 是 Spark Application 运行时上下文对象,包含了很多其他功能组件。
1、SparkContext 是用户通往 Spark 集群的唯一入口,可以用来在 Spark 集群中创建 RDD、累加器 Accumulator 和广播变量 Braodcast Variable,但是你想要的功能,你问他就行。
2、SparkContext 也是整个 Spark 应用程序中至关重要的一个对象,可以说是整个应用程序运行调度的核心(不是指资源调度)
3、SparkContext 在实例化的过程中会初始化 DAGScheduler、TaskScheduler 和 SchedulerBackend
4、SparkContext 会调用 DAGScheduler 将整个 Job 划分成几个小的阶段(Stage),TaskScheduler 会调度每个 Stage 的任务 (Task) 应该如何处理。另外,SchedulerBackend 管理整个集群中为这个当前的应用分配的计算资源 (Executor)
SparkConf 是 Spark 中用来管理配置一个的管理类,类似于 Hadoop 中的 Configuration。
SparkEnv 是 Spark Application 在运行的时候,所需要的各种功能组件的一个整合体,类似于 Hadoop 中的 Context。
1、重点:获取程序编写入口 SparkContext:new SparkContext(sparkConf)(新版本: SparkSession.getOrCreate();)
2、通过 SparkContext 来加载数据源得到数据抽象对象:RDD
3、针对数据抽象对象 RDD 调用各种算子执行各种逻辑计算:lazy,延迟到 action 的内部来执行
4、重点:调用 action 算子触发任务的提交执行:sparkContext.runJob()
5、处理结果并且关闭资源
Spark Application 提交执行过程中的消息交互,大致如下:在通过 spark-submit 提交一个 App 运行的时候,其实是执行 SparkSubmit 这个类,启动 Client,在它的内部,启动一个 RPC 客户端。
最容易造成误解的两个概念:
01、理解业务,了解 Spark 编程,编写业务代码实现
02、将应用程序项打成 jar 包
03、通过 spark-submit 脚本来提交,在提交的时候,可以指定资源系统类型
04、执行 SparkSubmit 类的 main ()
05、在标准的 Spark Standalone 集群中:转交给 ClientApp 的类来执行,如果是 Spark on yarn,则客户端类是:YarnClusterApplication
06、会在 ClientApp 内部初始化 CLientEndponit(存在于client中) 的组件:发送 RequestSubmitDriver 给 Master
07、Master 处理 RequestSubmitDriver 消息: 注册 Driver, 启动 Driver,返回 SubmitDriverResponse 消息给 ClientApp 的 CLientEndponit
08、启动 Driver: java DriverWrapper 这个类,转到: DriverDrapper main() 方法
09、通过反射的方式启动和执行我们自己写的业务代码的 main() 方法: JavaWordCount.main()
10、自己编写的业务代码中的第一句代码: 初始化 SparkSessoin(SparkConf, SparkContext)
11、初始化 SparkContext: TaskScheduler SchedulerBackend(DriverEndpoint CLientEndpoint) DAGScheduler
12、应用注册:ClientEndpoint 发送 RegisterApplication 消息给 Master, 返回 RegisteredApplication
13、Master 发送消息 LaunchExecutor 给 Worker, 启动 Executor(真正启动的是:ExecutorBackend)
14、Executor 启动了,则初始化一个线程池,等待 Driver 分发任务过来,由线程池执行,Executor 启动好了之后,会向 Driver 注册,同时也会向 Master 反馈
15、当一个 Spark job 的 Driver 和 Executor 都启动好了之后,那么意味着 Spark Context 的初始化就搞定了
16、接下来就是 Action 算子执行,触发 job 的提交:sparkContext.runJob();
17、SparkContext 中的 DAGScheduler 对应用代码构建的 DAG 进行 stage 切分
18、SparkContext 中的 TaskScheduler 对当前 Stage 进行提交执行:发送 LaunchTask 消息给 Executor,在 Executor 中的一个线程中启动一个 Task