1.Accumulator是由Driver端总体进行维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完成后会到Driver端去合并累加结果。Accumulator有两个性质:
1、只会累加,合并即累加;
2、不改变Spark作业懒执行的特点,即没有action操作触发job的情况下累加器的值有可能是初始值。
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("test003").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
println("***********************************")
// 使用scala集合完成累加
var count1:Int = 0;
var data = Seq(1,2,3,4)
data.map(x=> count1 +=x)
println("scala集合进行累加:" + count1)
println("***********************************")
// 使用RDD累加,但是count2打印结果为0
// 使用foreach传递的是函数,driver在把变量发送到work时,work中Executor都有一份count2变量副本,
// 最后执行计算时每个Executor的count2会加上自己的x,与dirver短中定义的count2没有关系,所以打印结果是0,
var count2:Int = 0
val dataRDD: RDD[Int] = sc.parallelize(data)
dataRDD.foreach(x=> count2 +=x)
println(count2)
println("**************使用累加器*********************")
val acc: LongAccumulator = sc.longAccumulator("accumulatorTest")
dataRDD.foreach(x=>acc.add(x))
println("计算元素累积和:" + acc.value)
println("统计元素个数:" + acc.count)
println("统计元素平均值:" + acc.avg)
println("统计元素总和:" + acc.sum)
}
}