Apache Spark是一个开源的通用集群计算系统,它提供了High-level编程API,支持Scala、Java和Python三种编程语言。Spark内核使用Scala语言编写,通过基于Scala的函数式编程特性,在不同的计算层面进行抽象,代码设计非常优秀。
RDD(Resilient Distributed Datasets),弹性分布式数据集,它是对分布式数据集的一种内存抽象,通过受限的共享内存方式来提供容错性,同时这种内存模型使得计算比传统的数据流模型要高效。RDD具有5个重要的特性,如下图所示:
上图展示了2个RDD进行JOIN操作,体现了RDD所具备的5个主要特性,如下所示:
1)一组分区
2)计算每一个数据分片的函数
3)RDD上的一组依赖
4)可选,对于键值对RDD,有一个Partitioner(通常是HashPartitioner)
5)可选,一组Preferred location信息(例如,HDFS文件的Block所在location信息)
有了上述特性,能够非常好地通过RDD来表达分布式数据集,并作为构建DAG图的基础:首先抽象一次分布式计算任务的逻辑表示,最终将任务在实际的物理计算环境中进行处理执行。
在描述Spark中的计算抽象,我们首先需要了解如下几个概念:
1)Application
用户编写的Spark程序,完成一个计算任务的处理。它是由一个Driver程序和一组运行于Spark集群上的Executor组成。
2)Job
用户程序中,每次调用Action时,逻辑上会生成一个Job,一个Job包含了多个Stage。
3)Stage
Stage包括两类:ShuffleMapStage和ResultStage,如果用户程序中调用了需要进行Shuffle计算的Operator,如groupByKey等,就会以Shuffle为边界分成ShuffleMapStage和ResultStage。
4)TaskSet
基于Stage可以直接映射为TaskSet,一个TaskSet封装了一次需要运算的、具有相同处理逻辑的Task,这些Task可以并行计算,粗粒度的调度是以TaskSet为单位的。
5)Task
Task是在物理节点上运行的基本单位,Task包含两类:ShuffleMapTask和ResultTask,分别对应于Stage中ShuffleMapStage和ResultStage中的一个执行基本单元。
下面,我们看一下,上面这些基本概念之间的关系,如下图所示:
上图,为了简单,每个Job假设都很简单,并且只需要进行一次Shuffle处理,所以都对应2个Stage。实际应用中,一个Job可能包含若干个Stage,或者是一个相对复杂的Stage DAG。
在Standalone模式下,默认使用的是FIFO这种简单的调度策略,在进行调度的过程中,大概流程如下图所示:
从用户提交Spark程序,最终生成TaskSet,而在调度时,通过TaskSetManager来管理一个TaskSet(包含一组可在物理节点上执行的Task),这里面TaskSet必须要按照顺序执行才能保证计算结果的正确性,因为TaskSet之间是有序依赖的(上溯到ShuffleMapStage和ResultStage),只有一个TaskSet中的所有Task都运行完成后,才能调度下一个TaskSet中的Task去执行。
先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。
makeOffer的目的是在有资源更新的情况下,通过调用scheduler的resourceOffers方法来触发它对现有的任务进行一次分配,最终launch新的tasks。这里的全局 scheduler就是TaskScheduler,实现是TaskSchedulerImpl,它可以对接各种SchedulerBackend的实现,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的时候,会把现有的 executor资源以WorkerOfffer列表的方式传给scheduler,即以worker为单位,将worker信息及其内的资源交给 scheduler。scheduler拿到这一些集群的资源后,去遍历已提交的tasks并根据locality决定如何launch tasks。
TaskScheduler里,resourceOffers方法会将已经提交的tasks进行一次优先级排序,这个排序算法目前是两种:FIFO或FAIR。得到这一份待运行的tasks后,接下里就是要把schedulerBackend交过来的worker资源信息合理分配给这些tasks。分配前,为了避免每次都是前几个worker被分到tasks,所以先对WorkerOffer列表进行一次随机洗牌。接下来就是遍历tasks,看workers的资源“够不够”,“符不符合”task,ok 的话task就被正式launch起来。注意,这里资源"够不够"是很好判断的,在TaskScheduler里设置了每个task启动需要的cpu个数,默认是1,所以只需要做核数的大小判断和减1操作就可以遍历分配下去。而"符不符合"这件事情,取决于每个tasks的locality设置。
task 的locality有五种,按优先级高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同个进程里,次好是同个node(即机器)上,再次是同机架,或任意都行。task有自己的locality,如果本次资源里没有想要的 locality资源,怎么办呢?spark有一个spark.locality.wait参数,默认是3000ms。对于 process,node,rack,默认都使用这个时间作为locality资源的等待时间。所以一旦task需要locality,就可能会触发delay scheduling。
到这里,对于任务的分配,资源的使用大致有个了解。实际上,TaskScheduler的resourceOffer里还触发了 TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是会检查task的 locality并最终调用DAGScheduler去launch这个task。这些类的名字以及他们彼此的调用关系,看起来是比较乱的。我简单梳理下。
这件事情要从Spark的DAG切割说起。Spark RDD通过其transaction和action操作,串起来形成了一个DAG。action的调用,触发了DAG的提交和整个job的执行。触发之后,由DAGScheduler这个全局唯一的面向stage的DAG调度器来切分DAG,根据是否 shuffle来切成多个小DAG,即stage。凡是RDD之间是窄依赖的,都归到一个stage里,这里面的每个操作都对应成MapTask,并行度就是各自RDD的partition数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个stage,这里面的操作对应成ResultTask,结果 RDD的partition数就是并行度。MapTask和ResultTask分别可以简单理解为传统MR的Map和Reduce,切分他们的依据本质上就是shuffle。所以shuffle之前,大量的map是可以同partition内操作的。每个stage对应的是多个MapTask或多个 ResultTask,这一个stage内的task集合成一个TaskSet类,由TaskSetManager来管理这些task的运行状态,locality处理(比如需要delay scheduling)。这个TaskSetManager是Spark层面上的,如何管理自己的tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是 TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如 mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。
SchedulerBackend 的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有 yarn,mesos,standalone。拿standalone模式来说粗粒度,每台物理机器是一个worker,worker一共可以使用多少 cpu和内存,启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。在我看来,粗粒度与细粒度的主要区别,就是粗粒度是进程long-running的,计算线程可以调到executor上跑,但executor的cpu和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是AMPLab论文里最先提出来并在Mesos里实现的。AMPLab 在资源使用粒度甚至任务分配最优的这块领域有不少论文,包括Mesos的DRF算法、Sparrow调度器等。所以standalone模式下,根据 RDD的partition数,以及每个task需要的cpu数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道TaskSet要分几批才能跑完一个stage。
Spark集群在设计的时候,并没有在资源管理的设计上对外封闭,而是充分考虑了未来对接一些更强大的资源管理系统,如YARN、Mesos等,所以Spark架构设计将资源管理单独抽象出一层,通过这种抽象能够构建一种适合企业当前技术栈的插件式资源管理模块,从而为不同的计算场景提供不同的资源分配与调度策略。Spark集群模式架构,如下图所示:
上图中,Spark集群Cluster Manager目前支持如下三种模式:
1)Standalone模式
Standalone模式是Spark内部默认实现的一种集群管理模式,这种模式是通过集群中的Master来统一管理资源,而与Master进行资源请求协商的是Driver内部的StandaloneSchedulerBackend(实际上是其内部的StandaloneAppClient真正与Master通信),后面会详细说明。
2)YARN模式
YARN模式下,可以将资源的管理统一交给YARN集群的ResourceManager去管理,选择这种模式,可以更大限度的适应企业内部已有的技术栈,如果企业内部已经在使用Hadoop技术构建大数据处理平台。
3)Mesos模式
随着Apache Mesos的不断成熟,一些企业已经在尝试使用Mesos构建数据中心的操作系统(DCOS),Spark构建在Mesos之上,能够支持细粒度、粗粒度的资源调度策略(Mesos的优势),也可以更好地适应企业内部已有技术栈。
那么,Spark中是怎么考虑满足这一重要的设计决策的呢?也就是说,如何能够保证Spark非常容易的让第三方资源管理系统轻松地接入进来。我们深入到类设计的层面看一下,如下图类图所示:
可以看出,Task调度直接依赖SchedulerBackend,SchedulerBackend与实际资源管理模块交互实现资源请求。这里面,CoarseGrainedSchedulerBackend是Spark中与资源调度相关的最重要的抽象,它需要抽象出与TaskScheduler通信的逻辑,同时还要能够与各种不同的第三方资源管理系统无缝地交互。实际上,CoarseGrainedSchedulerBackend内部采用了一种ResourceOffer的方式来处理资源请求。
Spark RPC层是基于优秀的网络通信框架Netty设计开发的,但是Spark提供了一种很好地抽象方式,将底层的通信细节屏蔽起来,而且也能够基于此来设计满足扩展性,比如,如果有其他不基于Netty的网络通信框架的新的RPC接入需求,可以很好地扩展而不影响上层的设计。RPC层设计,如下图类图所示:
任何两个Endpoint只能通过消息进行通信,可以实现一个RpcEndpoint和一个RpcEndpointRef:想要与RpcEndpoint通信,需要获取到该RpcEndpoint对应的RpcEndpointRef即可,而且管理RpcEndpoint和RpcEndpointRef创建及其通信的逻辑,统一在RpcEnv对象中管理。
Standalone模式下,Spark集群采用了简单的Master-Slave架构模式,Master统一管理所有的Worker,这种模式很常见,我们简单地看下Spark Standalone集群启动的基本流程,如下图所示:
可以看到,Spark集群采用的消息的模式进行通信,也就是EDA架构模式,借助于RPC层的优雅设计,任何两个Endpoint想要通信,发送消息并携带数据即可。上图的流程描述如下所示:
1)Master启动时首先创一个RpcEnv对象,负责管理所有通信逻辑
2)Master通过RpcEnv对象创建一个Endpoint,Master就是一个Endpoint,Worker可以与其进行通信
3)Worker启动时也是创一个RpcEnv对象
4)Worker通过RpcEnv对象创建一个Endpoint
5)Worker通过RpcEnv对,建立到Master的连接,获取到一个RpcEndpointRef对象,通过该对象可以与Master通信
6)Worker向Master注册,注册内容包括主机名、端口、CPU Core数量、内存数量
7)Master接收到Worker的注册,将注册信息维护在内存中的Table中,其中还包含了一个到Worker的RpcEndpointRef对象引用
8)Master回复Worker已经接收到注册,告知Worker已经注册成功
9)此时如果有用户提交Spark程序,Master需要协调启动Driver;而Worker端收到成功注册响应后,开始周期性向Master发送心跳
集群处理计算任务的运行时(用户提交了Spark程序),最核心的顶层组件就是Driver和Executor,它们内部管理很多重要的组件来协同完成计算任务,核心组件栈如下图所示:
Driver和Executor都是运行时创建的组件,一旦用户程序运行结束,他们都会释放资源,等待下一个用户程序提交到集群而进行后续调度。上图,我们列出了大多数组件,其中SparkEnv是一个重量级组件,他们内部包含计算过程中需要的主要组件,而且,Driver和Executor共同需要的组件在SparkEnv中也包含了很多。这里,我们不做过多详述,后面交互流程等处会说明大部分组件负责的功能。
在Standalone模式下,Spark中各个组件之间交互还是比较复杂的,但是对于一个通用的分布式计算系统来说,这些都是非常重要而且比较基础的交互。首先,为了理解组件之间的主要交互流程,我们给出一些基本要点:
一个Application会启动一个Driver
一个Driver负责跟踪管理该Application运行过程中所有的资源状态和任务状态
一个Driver会管理一组Executor
一个Executor只执行属于一个Driver的Task
核心组件之间的主要交互流程,如下图所示:
上图中,通过不同颜色或类型的线条,给出了如下6个核心的交互流程,我们会详细说明:
橙色:提交用户Spark程序
用户提交一个Spark程序,主要的流程如下所示:
1)用户spark-submit脚本提交一个Spark程序,会创建一个ClientEndpoint对象,该对象负责与Master通信交互
2)ClientEndpoint向Master发送一个RequestSubmitDriver消息,表示提交用户程序
3)Master收到RequestSubmitDriver消息,向ClientEndpoint回复SubmitDriverResponse,表示用户程序已经完成注册
4)ClientEndpoint向Master发送RequestDriverStatus消息,请求Driver状态
5)如果当前用户程序对应的Driver已经启动,则ClientEndpoint直接退出,完成提交用户程序
紫色:启动Driver进程
当用户提交用户Spark程序后,需要启动Driver来处理用户程序的计算逻辑,完成计算任务,这时Master协调需要启动一个Driver,具体流程如下所示:
1)Maser内存中维护着用户提交计算的任务Application,每次内存结构变更都会触发调度,向Worker发送LaunchDriver请求
2)Worker收到LaunchDriver消息,会启动一个DriverRunner线程去执行LaunchDriver的任务
3)DriverRunner线程在Worker上启动一个新的JVM实例,该JVM实例内运行一个Driver进程,该Driver会创建SparkContext对象
红色:注册Application
Dirver启动以后,它会创建SparkContext对象,初始化计算过程中必需的基本组件,并向Master注册Application,流程描述如下:
1)创建SparkEnv对象,创建并管理一些基本组件
2)创建TaskScheduler,负责Task调度
3)创建StandaloneSchedulerBackend,负责与ClusterManager进行资源协商
4)创建DriverEndpoint,其它组件可以与Driver进行通信
5)在StandaloneSchedulerBackend内部创建一个StandaloneAppClient,负责处理与Master的通信交互
6)StandaloneAppClient创建一个ClientEndpoint,实际负责与Master通信
7)ClientEndpoint向Master发送RegisterApplication消息,注册Application
8)Master收到RegisterApplication请求后,回复ClientEndpoint一个RegisteredApplication消息,表示已经注册成功
蓝色:启动Executor进程
1)Master向Worker发送LaunchExecutor消息,请求启动Executor;同时Master会向Driver发送ExecutorAdded消息,表示Master已经新增了一个Executor(此时还未启动)
2)Worker收到LaunchExecutor消息,会启动一个ExecutorRunner线程去执行LaunchExecutor的任务
3)Worker向Master发送ExecutorStageChanged消息,通知Executor状态已发生变化
4)Master向Driver发送ExecutorUpdated消息,此时Executor已经启动
粉色:启动Task执行
1)StandaloneSchedulerBackend启动一个DriverEndpoint
2)DriverEndpoint启动后,会周期性地检查Driver维护的Executor的状态,如果有空闲的Executor便会调度任务执行
3)DriverEndpoint向TaskScheduler发送Resource Offer请求
4)如果有可用资源启动Task,则DriverEndpoint向Executor发送LaunchTask请求
5)Executor进程内部的CoarseGrainedExecutorBackend调用内部的Executor线程的launchTask方法启动Task
6)Executor线程内部维护一个线程池,创建一个TaskRunner线程并提交到线程池执行
绿色:Task运行完成
1)Executor进程内部的Executor线程通知CoarseGrainedExecutorBackend,Task运行完成
2)CoarseGrainedExecutorBackend向DriverEndpoint发送StatusUpdated消息,通知Driver运行的Task状态发生变更
3)StandaloneSchedulerBackend调用TaskScheduler的updateStatus方法更新Task状态
4)StandaloneSchedulerBackend继续调用TaskScheduler的resourceOffers方法,调度其他任务运行
Block管理,主要是为Spark提供的Broadcast机制提供服务支撑的。Spark中内置采用TorrentBroadcast实现,该Broadcast变量对应的数据(Task数据)或数据集(如RDD),默认会被切分成若干4M大小的Block,Task运行过程中读取到该Broadcast变量,会以4M为单位的Block为拉取数据的最小单位,最后将所有的Block合并成Broadcast变量对应的完整数据或数据集。将数据切分成4M大小的Block,Task从多个Executor拉取Block,可以非常好地均衡网络传输负载,提高整个计算集群的稳定性。
通常,用户程序在编写过程中,会对某个变量进行Broadcast,该变量称为Broadcast变量。在实际物理节点的Executor上执行Task时,需要读取Broadcast变量对应的数据集,那么此时会根据需要拉取DAG执行流上游已经生成的数据集。采用Broadcast机制,可以有效地降低数据在计算集群环境中传输的开销。具体地,如果一个用户对应的程序中的Broadcast变量,对应着一个数据集,它在计算过程中需要拉取对应的数据,如果在同一个物理节点上运行着多个Task,多个Task都需要该数据,有了Broadcast机制,只需要拉取一份存储在本地物理机磁盘即可,供多个Task计算共享。
另外,用户程序在进行调度过程中,会根据调度策略将Task计算逻辑数据(代码)移动到对应的Worker节点上,最优情况是对本地数据进行处理,那么代码(序列化格式)也需要在网络上传输,也是通过Broadcast机制进行传输,不过这种方式是首先将代码序列化到Driver所在Worker节点,后续如果Task在其他Worker中执行,需要读取对应代码的Broadcast变量,首先就是从Driver上拉取代码数据,接着其他晚一些被调度的Task可能直接从其他Worker上的Executor中拉取代码数据。
我们通过以Broadcast变量taskBinary为例,说明Block是如何管理的,如下图所示:
上图中,Driver负责管理所有的Broadcast变量对应的数据所在的Executor,即一个Executor维护一个Block列表。在Executor中运行一个Task时,执行到对应的Broadcast变量taskBinary,如果本地没有对应的数据,则会向Driver请求获取Broadcast变量对应的数据,包括一个或多个Block所在的Executor列表,然后该Executor根据Driver返回的Executor列表,直接通过底层的BlockTransferService组件向对应Executor请求拉取Block。Executor拉取到的Block会缓存到本地,同时向Driver报告该Executor上存在的Block信息,以供其他Executor执行Task时获取Broadcast变量对应的数据。
用户通过spark-submit提交或者运行spark-shell REPL,集群创建Driver,Driver加载Application,最后Application根据用户代码转化为RDD,RDD分解为Tasks,Executor执行Task等系列知识,整体交互蓝图如下:
1)Client运行时向Master发送启动驱动申请(发送RequestSubmitDriver指令)
2)Master调度可用Worker资源进行驱动安装(发送LaunchDriver指令)
3)Worker运行DriverRunner进行驱动加载,并向Master发送应用注册请求(发送RegisterApplication指令)
4)Master调度可用Worker资源进行应用的Executor安装(发送LaunchExecutor指令)
5)Executor安装完毕后向Driver注册驱动可用Executor资源(发送RegisterExecutor指令)
6)最后是运行用户代码时,通过DAGScheduler,TaskScheduler封装为可以执行的TaskSetManager对象
7)TaskSetManager对象与Driver中的Executor资源进行匹配,在队形的Executor中发布任务(发送LaunchTask指令)
8)TaskRunner执行完毕后,调用DriverRunner提交给DAGScheduler,循环7.直到任务完成