Spark 多目录输出

实现多目录输出自定义类


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class MyMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any] {
//生成最终生成的key的类型,这里不要,给Null
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()

//生成最终生成的value的类型,这里是String
override def generateActualValue(key: Any, value: Any): Any = {
value.asInstanceOf[String]
}

//生成文件名
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
s"$key/$name"
}
}
主类,使用saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])方法保存数据,指定参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object MultipleDirectory {
def main(args: Array[String]): Unit = {
val out = "spark-core/out"
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
CheckHDFSOutPath.ifExistsDeletePath(sc.hadoopConfiguration,out)
//读取数组,转换成键值对的格式
val lines = sc.textFile("spark-core/ip/access-result/*")
val mapRDD: RDD[(String, String)] = lines.map(line => {
val words = line.split(",")
(words(12), line)
})
//多目录保存文件
mapRDD.saveAsHadoopFile(out,
classOf[String],
classOf[String],
classOf[MyMultipleTextOutputFormat])
sc.stop()
}
}
结果

多目录输出