Spark 计数器

为什么要定义计数器?

在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。


图解计数器

错误的图解
累加器错误的图解

正确的图解
累加器正确的图解

计数器种类很多,但是经常用的就是两种,longAccumulator和collectionAccumulator

需要注意的是计数器是lazy的,只有触发action才会进行计数,在不持久化的情况下重复触发action,计数器会重复累加

LongAccumulator

Accumulators 是只能通过associative和commutative操作“added”的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce)和Spark本身支持数字类型的累加器,程序员还可以添加对新类型的支持。

longAccumulator通过累加的方式计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object MyLongAccumulator {

def main(args: Array[String]): Unit = {

val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
var acc = sc.longAccumulator("计数")

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

val forRDD = rdd.map(x => {
// 计数器做累加
acc.add(1L)
})
// action操作
forRDD.count()

println(acc.value) // 9
}
}

使用longAccumulator做计数的时候要小心重复执行action导致的acc.value的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object MyLongAccumulatorV2{
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
//生成计数器
val acc = sc.longAccumulator("计数")
val rdd = sc.parallelize(1 to 8)
val forRDD = rdd.map(x => {
//计数器做累加
acc.add(1L)
})
forRDD.count()
println(acc.value) //8
forRDD.count()
println(acc.value) //16
}
}

由于重复执行了count(),累加器的数量成倍增长,解决这种错误累加也很简单,就是在count之前调用forRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集,而无需从头开始计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object MyLongAccumulator {
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
//生成计数器
val acc = sc.longAccumulator("计数")
val rdd = sc.parallelize(1 to 8)
val forRDD = rdd.map(x => {
//计数器做累加
acc.add(1L)
})
forRDD.cache().count()
println(acc.value) //8
forRDD.count()
println(acc.value) //8
}
}
CollectionAccumulator

collectionAccumulator,集合计数器,计数器中保存的是集合元素,通过泛型指定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 需求:id后三位相同的加入计数器
*/
object MyCollectionAccumulator {
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)

//生成集合计数器
val acc = sc.collectionAccumulator[People]("集合计数器")

//生成RDD
val rdd: RDD[People] = sc.parallelize(Array(People("tunan", 100000), People("xiaoqi", 100001), People("张三", 100222), People("李四", 100003)))

//map操作
rdd.map(x => {
val id2 = x.id.toString.reverse
//满足条件就加入计数器,
if (id2(0) == id2(1) && id2(0) ==id2(2)){
acc.add(x)
}
}).count() //触发action

println(acc.value) //[People(张三,100222), People(tunan,100000)]
}
case class People(name:String,id:Long);
}

注意事项:

  1. 计数器在Driver端定义赋初始值,计数器只能在Driver端读取最后的值,在Excutor端更新。

  2. 计数器不是一个调优的操作,因为如果不这样做,结果是错的