📋 博主简介
- 💖 作者简介:大家好,我是wux_labs。😜
热衷于各种主流技术,热爱数据科学、机器学习、云计算、人工智能。
通过了TiDB数据库专员(PCTA)、TiDB数据库专家(PCTP)、TiDB数据库认证SQL开发专家(PCSD)认证。
通过了微软Azure开发人员、Azure数据工程师、Azure解决方案架构师专家认证。
对大数据技术栈Hadoop、Hive、Spark、Kafka等有深入研究,对Databricks的使用有丰富的经验。- 📝 个人主页:wux_labs,如果您对我还算满意,请关注一下吧~🔥
- 📝 个人社区:数据科学社区,如果您是数据科学爱好者,一起来交流吧~🔥
- 🎉 请支持我:欢迎大家 点赞👍+收藏??+吐槽📝,您的支持是我持续创作的动力~🔥
大家好!今天为大家分享的是《PySpark大数据分析实战》第2章第2节的内容:宽窄依赖和阶段划分。
前面介绍过Stage中Task的数量就是Stage的并行度,也是RDD的分区数,从Spark Driver Web UI中可以看到每个Stage有两个Task,意味着程序中处理的RDD有两个分区。在Spark中,RDD的数据是只读的,不支持修改操作,如果要更新RDD中的数据,则只能将原有的RDD经过转换生成一个新的RDD,这个过程中新RDD与原有的RDD就存在依赖关系。现在来了解一下任务执行的详细情况,将任务按代码拆分开来看,并使用glom算子为分区数据添加嵌套。
textFile()加载文件后的分区数是两个,第1个分区的数据是[‘Hello Python’, 'Hello Spark You ', ‘Hello Python Spark’],第2个分区的数据是[‘You know PySpark’],如图所示。
flatMap()扁平化数据之后的分区数是两个,第1个分区的数据是[‘Hello’, ‘Python’, ‘Hello’, ‘Spark’, ‘You’, ‘Hello’, ‘Python’, ‘Spark’],第2个分区的数据是[‘You’, ‘know’, ‘PySpark’],如图所示。
map()转换数据之后的分区数是两个,第1个分区的数据是[(‘Hello’, 1), (‘Python’, 1), (‘Hello’, 1), (‘Spark’, 1), (‘You’, 1), (‘Hello’, 1), (‘Python’, 1), (‘Spark’, 1)],第2个分区的数据是[(‘You’, 1), (‘know’, 1), (‘PySpark’, 1)],如图所示。
在整个过程中,分区数都是保持两个不变,注意每个单词所在的分区情况,无论数据如何变化,Hello、Python、Spark、You这4个单词总是保持在第1个分区, You、know、PySpark这3个单词总是保持在第2个分区。reduceByKey(…)根据key值进行聚合后的数据分区情况,第1个分区的数据是[(‘Hello’, 3), (‘Python’, 2), (‘Spark’, 2), (‘know’, 1), (‘PySpark’, 1)],第2个分区的数据是[(‘You’, 2)],如图所示。
从分区数据可以看出,reduceByKey依然是两个分区,但是know、PySpark两个单词已经不再包含在第2个分区,而是到了第1个分区,第1个分区中的单词You则到了第2个分区。数据在RDD中的流转过程如图所示。
textFile将文件加载成包含两个分区的RDD,在filterMap和map过程中,每个分区的数据总是保留在自己所在的分区进行流转,上一个RDD的一个分区的数据完全流转到下一个RDD的一个分区,下一个RDD的一个分区的数据完全来自上一个RDD的一个分区,也就是子RDD的一个分区的数据仅依赖于父RDD的一个分区。在reduceByKey过程中,上一个RDD的一个分区的数据分别流向了下一个RDD的两个分区,而下一个RDD的一个分区的数据则同时来自于上一个RDD的两个分区,也就是子RDD的一个分区的数据依赖于父RDD的所有分区。
当父RDD的一个分区的数据仅流向子RDD的一个分区,在Spark中被称为窄依赖,filterMap、map算子是窄依赖的算子。窄依赖过程简单,不同分区的数据互不影响,可以并行执行,是最好进行优化的部分。
当父RDD的一个分区的数据流向子RDD的多个分区,在Spark中被称为宽依赖,reduceByKey算子是宽依赖的算子。宽依赖涉及到Shuffle,子RDD的数据需要依赖所有父RDD的Shuffle过程完成才能继续往下处理,因此过程复杂,优化相对复杂。
Hadoop中的MapReduce是计算向数据靠拢的,数据块在HDFS的哪个节点上就会在哪个节点上启动计算任务,但有些计算需要将各个节点上的同一类数据汇集到某一个节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为Shuffle。在Hadoop中Shuffle是连接Map任务和Reduce任务的桥梁,Map任务的输出需要经过Shuffle过程才被作为Reduce过程的输入。Spark中的Shuffle过程与Hadoop的Shuffle过程十分类似,案例中的单词You存在于RDD的两个分区,也就有可能存在于HDFS的两个节点上,而最终输出结果单词You汇集在了同一个分区,这个过程就涉及Shuffle,Shuffle过程就会产生宽依赖。reduceByKey算子是需要进行Shuffle的。
Spark应用程序提交后,Spark引擎会根据提交的代码先生成一个有向无环图DAG,即代码的执行计划,Spark基于DAG进行流程的调度。在进行调度之前,Spark会对整个流程做Stage的划分,根据DAG中的RDD的依赖关系从后往前推,当遇到Shuffle过程,也就是宽依赖的地方,就拆分出一个新的Stage,当遇到窄依赖的地方,就将父RDD加入到当前的Stage中,Spark划分Stage的依据就是宽依赖。案例代码中Spark中Stage的划分情况如图所示。
好了,感谢大家的关注,今天就分享到这里了,更多详细内容,请阅读原书或持续关注专栏。