Spark 分组TopN

使用Spark Core解决TopN问题,想写出性能好的代码也不是件容易的事。
下面我们尝试使用多种方式解决TopN问题。


方法1,直接reduceByKey完成分组求和排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def main(args: Array[String]): Unit = {
val in = "file:///home/hadoop/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)

val fileRDD = sc.textFile(in)

val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1) //((domain,url),1)
})

val result = mapRDD.reduceByKey(_ + _).groupBy(x => x._1._1).mapValues( x=> x.toList.sortBy(x => -x._2).map(x => (x._1._1,x._1._2,x._2)).take(2))
result.foreach(println)
}

该方法虽然直接,但是在reduceByKey和groupBy分别进过了shuffle,而且x.toList是一个非常吃内存的操作,如果数据量大,直接OOM。

方法2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)

val fileRDD = sc.textFile(in)

val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})

val domains = Array("www.google.com", "www.ruozedata.com", "www.baidu.com")

for (domain <- domains){
mapRDD.filter(x => x._1._1.equals(domain)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
}
}

核心思想:把需要分组分类的数据提前拿出来,在filter中过滤,每次执行一个分组,虽然减少了一次shuffle,但是我们不可能每次都把需要的数据都能提前拿到数据。

方法3,使用ditinct.collect返回的数组替换人为创建的数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)

val fileRDD = sc.textFile(in)

val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})

val domains = mapRDD.map(x => x._1._1).distinct().collect()

for (domain <- domains){
mapRDD.filter( x => domain.equals(x._1._1)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
}
}
方法4,使用分区执行替换for循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)

val fileRDD = sc.textFile(in)

val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})

val domains = mapRDD.map(x => x._1._1).distinct().collect()

val mapPartRDD = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
partition.toList.sortBy(x => -x._2).take(2).iterator
})

mapPartRDD.foreach(println)
}

自定义的分区类

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyPartitioner(domains:Array[String]) extends Partitioner{

val map = mutable.HashMap[String,Int]()
for (i <- 0 until (domains.length)){
map(domains(i)) = i
}
override def numPartitions: Int = domains.length

override def getPartition(key: Any): Int = {
val domain = key.asInstanceOf[(String, String)]._1
map(domain)
}
}
方法5,使用TreeSet替换toList实现最终的排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)

val fileRDD = sc.textFile(in)

val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})

val domains = mapRDD.map(x => x._1._1).distinct().collect()

val ord: Ordering[((String, String), Int)] = new Ordering[((String, String), Int)]() {
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
if (!x._1.equals(y._1) && x._2 == y._2) {
return 1
}
// 降序排
y._2 - x._2
}
}

val treeSort = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
val set = mutable.TreeSet.empty(ord)
partition.foreach(x => {
set.add(x)
if (set.size > 2) {
set.remove(set.lastKey) //移除最后一个
}
})
set.toIterator
}).collect()
treeSort.foreach(println)
}

使用TreeSet实现自定义排序器,使之每次维护的只有需要的极少量数据,这样占用内存少,效率最高。