spark核心:包括RDD、RDD算子、RDD的持久化/缓存、累加器和广播变量
学习链接:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的 MapReduce 框架采用非循环式的数据流模型,把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。
RDD 提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的 API(map/reduec/filter/groupBy…)。
参考论文:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
Resilient:弹性的,RDD 里面的中的数据可以保存在内存中或者磁盘里面;
Distributed:元素是分布式存储的,可以用于分布式计算;
Dataset:一个集合,可以存放很多元素
分区列表:一个分区/分片,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。
计算函数:一个函数会被作用在每一个分区。 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
依赖关系:一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制)
分区函数(默认是hash):对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。
最佳位置: 一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;
计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。
val inputUserInfoTxt = "filePath"
val userInfoRDD: RDD[String] = session.sparkContext.textFile(inputUserInfoTxt)
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
makeRDD 方法底层调用了 parallelize 方法
RDD算子分为两类:
Transformation:转换操作,返回一个新的RDD
Action:动作操作,返回值不是RDD(无返回值或者其他)
- RDD不实际存储真正计算的数据,而是记录了数据的位置,数据的转化关系
- RDD中所有的操作都是惰性求值/延迟执行的,即不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行
- 之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,整体运行更有效率。
转换算子 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由func函数计算后返回值为true的输入元素组成 |
flagMap(func) | 类似map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是一个单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func函数类型必须是Iterator[T]=>Iterator[U] |
mapParritionWithIndex(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集并返回一个新的RDD |
intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
distinct([numTask]) | 对源 RDD 进行去重后返回一个新的 RDD |
groupByKey([numTask)) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(SeqOp, combOp,[numTasks]) | 对 PairRDD 中相同的 Key 值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD |
sortBy(func,[ascending],[numTasks]) | 与 sortByKey 类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 对 rdd 进行管道操作 |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给RDD分区 |
Action算子
动作算子 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数 |
foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数 func |
统计操作:
算子 | count | mean | sum | max | min | variance | sampleVariance | stdev | sampleStdev | stats |
---|---|---|---|---|---|---|---|---|---|---|
含义 | 个数 | 均值 | 求和 | 最大值 | 最小值 | 方差 | 采样的方差 | 标准差 | 采样的标准差 | 查看统计结果 |
在实际开发中某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
persist方法和cache方法
RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立刻缓存,而是触发后面的action时,该RDD将被缓存在计算节点的内存中,并供后面重用。
RDD的cache()方法也是调用了persist方法实现。
默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。
持久化级别 | 说明 |
---|---|
MORY_ONLY(默认) | 将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果没有足够的内存存储 RDD,则某些分区将不会被缓存,每次需要时都会重新计算。这是默认级别 |
MORY_AND_DISK(开发中可以使用这个) | 将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取 |
MEMORY_ONLY_SER (Java and Scala) | 将 RDD 以序列化的 Java 对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的 CPU |
MEMORY_AND_DISK_SER (Java and Scala) | 与 MEMORY_ONLY_SER 类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们 |
DISK_ONLY | 将 RDD 分区存储在磁盘上 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等 | 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上 |
OFF_HEAP(实验中) | 与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。(即不是直接存储在 JVM 内存中) |
持久化/缓存可以把数据放在内存中,虽然快速,但是不可靠;也可以放在磁盘的,但是也不完全可靠,例如磁盘会损坏
Checkpoint的时候一般把数据放在HDFS上,借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。
SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint
RDD 和它依赖的父 RDD 的关系有两种不同的类型,即
宽依赖(wide dependency/shuffle dependency):父RDD的一个分区只会被子RDD的一个分区依赖
窄依赖(narrow dependency):父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
DAG指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);
原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
开始:通过 SparkContext 创建的 RDD;
结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
一个Spark程序可以有多个DAG(有几个Action,就有几个DAG)
一个DAG可以有多个Stage(根据宽依赖/shuffle进行划分)
同一个Stage可以有多个Task并行执行(task数=分区数,如上图,stage1有三个分区,对应也有三个Task)
DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。
在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。
并行计算
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。
按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。
对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中
参考论文:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:
import org.apache.spark.sql.SparkSession
object MyAccumulater {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[*]").appName("MY ACCUMULATOR").getOrCreate()
// 使用scala集合完成累加
var counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println(s"add $counter") // 6
// 使用RDD进行累加
var newCounter = 0
val newData = Seq(1, 2, 3)
val dataRdd = session.sparkContext.parallelize(newData)
dataRdd.foreach(x => newCounter += x)
println(s"session add $newCounter") //0
// foreach中的函数是传递给Worker中的Executor执行,用到了newCounter 变量
//而newCounter 变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
//最后各个Executor将各自个x加到自己的newCounter 上面了,和Driver端的newCounter 没有关系
val AddCounter = session.sparkContext.longAccumulator("accumulator")
dataRdd.foreach(x => AddCounter.add(x))
println(s"accumulator add $AddCounter") //6
}
}
package com.example
import org.apache.spark.sql.SparkSession
object MyBroadcast {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().master("local[*]").appName("MyBroadcast").getOrCreate()
val sc = session.sparkContext
sc.setLogLevel("WARN")
// 不使用广播变量
val kvFruit = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap = kvFruit.collectAsMap()
val fruitIds = sc.parallelize(List(2,4,1,3))
// 获取水果名
// 当数据量少的时候,这样运行没有问题
// 但是如果数据量大,Task数多,那么每个Task用到的fruitMap会被多次传输
// 应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用
val fruitNames = fruitIds.map(x => fruitMap(x))
fruitNames.foreach(println)
// 使用广播变量,将fruitMap放到各个机器上
// 注意: 广播变量的值不能被修改, 如需修改可以将数据存到外部数据源, 如MySQL、Redis
println("=====================")
val broadcastFruitMap = sc.broadcast(fruitMap)
val fruitNames2 = fruitIds.map(x => broadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}