Spark Transformations、Action

目录

  1. Transformations
  2. Action

Transformations

Transformations的特点是lazy的,和Scala中的lazy该念一致——延迟/懒加载,
不会立刻执行,只有等待遇到第一个action才会去提交作业到Spark上。

转换算子

map 作用到每一个元素
输入:任意类型的函数,输出:泛型U类型的函数,返回RDD

1
def map[U: ClassTag](f: T => U): RDD[U]

mapPartitions 作用到每一个分区
输入:一个可迭代的类型T,输出:一个可迭代的类型U,返回RDD

1
2
3
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

mapPartitionsWithIndex 作用到每一个分区并打印分区数
输入:分区索引,可迭代的类型T,输出:可迭代的类型U,返回RDD

1
2
3
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

glom() 按分区返回数组

1
def glom(): RDD[Array[T]]

案例:

1
listRDD.glom().collect().foreach(f=>f.foreach(x => println(_)))

filter() 过滤
输入:输入一个函数T,输出:一个布尔值,返回一个RDD

1
def filter(f: T => Boolean): RDD[T]

sample() 取样
输入:是否放回的布尔值,抽出来的概率,返回一个RDD

1
2
3
4
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

distinct(x) 去重 ==> numPartitions可指定分区
输入的必须是RDD,返回的也是一个RDD

1
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

案例:

1
val dist: RDD[Int] = listRDD3.distinct()

coalesce(x) 重点(小文件相关场景大量使用): ==> reduce数量决定最终输出的文件数,coalesce的作用是减少到指定分区数(x),减少分区是窄依赖
==> Spark作业遇到shuffle 会切分stage
输入一个分区数,返回一个重分区后的RDD

1
2
3
4
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

案例:

1
listRDD2.coalesce(1).getNumPartitions
双value算子

zip() 拉链 ==> 不同分区和不同元素都不能用

1
def zip[A1 >: A, B, That](that: GenIterable[B])(implicit bf: CanBuildFrom[Repr, (A1, B), That]): That

zipWithIndex() 打印拉链所在的分区

1
def zipWithIndex[A1 >: A, That](implicit bf: CanBuildFrom[Repr, (A1, Int), That]): That

案例:

1
2
3
val name = List("张三", "李四", "王五")
val age = List(19, 26, 38)
val zipRDD: List[((String, Int), Int)] = name.zip(age).zipWithIndex

union() 并集 ==> 分区数相加

1
override def union[B >: A, That](that: GenSeq[B])(implicit bf: CanBuildFrom[Repr, B, That]): That

案例:

1
2
3
val list1 = List(1,2,3,4,5,6)
val list2 = List(4,5,6,7,8,8,8)
val ints: List[Int] = list1.union(list2)

intersection() 交集

1
def intersect[B >: A](that: GenSeq[B]): Repr

案例:

1
val inter: List[Int] = list1.intersect(list2)

subtract() 差集
输入的必须是RDD,返回的也是一个RDD

1
def subtract(other: RDD[T]): RDD[T]

案例:

1
val sub: RDD[Int] = listRDD2.subtract(listRDD3)

cartesian() 笛卡尔积
输入的必须是RDD,返回的也是一个RDD

1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

案例:

1
val car = listRDD2.cartesian(listRDD3)
kv算子

mapValues 得到所有ky的函数
输入:一个函数V,输出:一个值U,返回key为K,value为U的键函数对RDD

1
def mapValues[U](f: V => U): RDD[(K, U)]

案例:

1
mapRDD.groupByKey().mapValues(_.sum).print()

sortBy(x) 降序指定-x,指定任意参数
输入键值对,指定排序的值,默认升序

1
2
3
4
5
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortByKey(true|false) 只能根据key排序
默认升序为true,可指定降序为false

1
2
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

案例:

1
mapRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).print()

groupByKey 返回的kv对中的函数可迭代
==>每个数据都经过shuffle,到reduce聚合,数据量大

可指定分区数,返回一个PariRDD,包含一个Key和一个可迭代的Value

1
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

案例:

1
mapRDD.groupByKey().mapValues(_.sum).print()

reduceByKey() 对value做指定的操作,直接返回函数
==>map端有Combiner先进行了一次预聚合操作,减少了网络IO传输的数据量,所以比groupByKey快
==>groupByKey的shuffle数据量明显多于reduceByKey,所以建议使用reduceByKey

输入两个值,输出一个值,返回一个PariRDD,包含一个明确的Key和一个明确的Value

1
def reduceByKey(func: (V, V) => V): RDD[(K, V)]

join()
两个RDDjoin,返回一个PariRDD包含一个key,两个Value

1
2
3
4
val mapRDD2 = sc.parallelize(List(("zhaoliu", 18), ("zhangsan", 22), ("list", 21), ("wangwu", 26)))
val mapRDD3 = sc.parallelize(List(("hongqi", "男"), ("zhangsan", "男"), ("list", "女"), ("wangwu", "男")))

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

leftOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个确定的左表Value值,一个Option类型的右表Value值,即可能为空

1
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

rightOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个确定的右表Value值,一个Option类型的左表Value值,即可能为空

1
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

fullOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个Option类型的右表Value值,一个Option类型的左表Value值,即都可能为空

1
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

cogroup
作用和join类似,不同的是返回的结果是可迭代的,而join返回的是值,原因是join底层调用了cogroup

1
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

面试题:Spark Core 不使用distinct去重

1
2
val listRDD3 = sc.parallelize(List(4, 5, 6, 7, 8, 9, 9, 9, 8, 5))
listRDD3.map(x=>(x,null)).reduceByKey((x,y)=>x).map(_._1).print()

Action

first()
返回第一个元素,等于take(1)

1
def first(): T

take()
拿出指定的前N个元素,返回一个数组,结果为原始顺序

1
def take(num: Int): Array[T]

count()
返回元素数量,是个Long型

1
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

sum
求和,返回一个Double型

1
def sum(): Double

max
返回最大值,结果通过隐式转换排序过

1
def max()(implicit ord: Ordering[T]): T

min
返回最小值,结果通过隐式转换排序过

1
def min()(implicit ord: Ordering[T]): T

top()
先排降序再返回前N个元素组成的数组,字典序

1
def top(num: Int)(implicit ord: Ordering[T]): Array[T]

案例:升序排序

1
listRDD.top(3)(Ordering.by(x => -x)).foreach(println)

takeOrdered
先排降序再返回N个元素组成的数组

1
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

案例:升序排序

1
listRDD.takeOrdered(3)(Ordering.by(x => x)).foreach(println)

reduce
聚合,输入两个元素输出一个元素,类型相同

1
def reduce(f: (T, T) => T): T

foreach
循环输出

1
def foreach(f: T => Unit): Unit

foreachPartition
分区循环输出
输入的是一个可迭代的类型T,输出Unit

1
def foreachPartition(f: Iterator[T] => Unit): Unit

案例:

1
listRDD.foreachPartition(x=>x.foreach(println))

countByKey
根据key统计个数,用作检测数据倾斜

1
def countByKey(): Map[K, Long]

lookup
根据map中的键来取出相应的值的,

1
def lookup(key: K): Seq[V]

案例:

1
mapRDD.lookup("zhangsan").foreach(println)