编写小案例 (wordcount)
@Test
def wordCount(): Unit = {
// 1. 创建 sc 对象
val conf = new SparkConf().setMaster("local[6]").setAppName("wordCount_source")
val sc = new SparkContext(conf)
// 2. 创建数据集
val textRDD = sc.parallelize(Seq("hadoop spark", "hadoop flume", "spark sqoop"))
// 3.数据处理
// 3.1、拆词
val splitRDD = textRDD.flatMap(_.split(" "))
// 3.2、赋予初始词频
val tupleRDD = splitRDD.map((_, 1))
// 3.3、聚合统计词频
val reduceRDD = tupleRDD.reduceByKey(_ + _)
// 3.4、将结果转换为字符串
val strRDD = reduceRDD.map(item => s"${item._1},${item._2}")
// 4.结果获取
strRDD.collect().foreach(println(_))
// 5.关闭 sc
sc.stop()
}
// 一个_ 指代一个参数**
集群组成
在 Spark 部分的底层执行逻辑开始之前,还是要先认识一下 Spark 的部署情况,根据部署情祝,从而理解如何调度
针对上图,首先可以看到整体上在集群中运行的角色有如下几个:
Master Daemon
负责管理 Master 节点,协调资源的获取,以及连接 Worker 节点来运行 Executor ,是 Spark 集群中的协调节点
Worker Daemon
Workers 也称之为叫 Slaves, 是 Spark 集群中的计算节点,用于和 Master 交互并管理 Executor.
当一个 Spark Job 提交后,会创建 SparkContext, 后 Worker 会启动对应的 Executor,
Executor Backend
上面有提到 Worker 用于控制 Executor 的启停,其实 Worker 是通过 Executor Backend 来进行控制的,Executor
Backend 是一个进程是一个 JVM 实例持有一个 Executor 对象
逻辑执行图
对于上面代码中的 reduceRDD 如果使用 toDebugString 打印调试信息的话,会显式如下内容
根据这段内容,大致能得到这样的一张逻辑执行图
其实 RDD 并设有什么严格的逻辑执行图和物理执行图的概念,这里也只是借用这个概念,从而让整个RDD的原理可以解释,好理解
对于 RDD 的逻辑执行图,起始于第一个入口 RDD 的创建,结束于 Actio 算子执行之前,主要的过程就是生成一组互相有依赖关系
的 RDD,其并不会真的执行,只是表示 RDD 之间的关系,数据的流转过程
物理执行图
当触发 Action 执行的时候, 这一组互相依赖的 RDD 要被处理, 所以要转化为可运行的物理执行图, 调度到集群中执行.
因为大部分 RDD 是不真正存放数据的, 只是数据从中流转, 所以, 不能直接在集群中运行 RDD, 要有一种 Pipeline 的思想, 需要将这组 RDD 转为 Stage 和 Task, 从而运行 Task, 优化整体执行速度.
以上的逻辑执行图会生成如下的物理执行图, 这一切发生在 Action 操作被执行时.
从上图可以总结如下几个点
Stage
中, 每一个这样的执行流程是一个 Task
, 也就是在同一个 Stage 中的所有 RDD 的对应分区, 在同一个 Task 中执行