Spark DRR转DF的两种方式

Spark DRR转DF的两种方式分为编程方式和反射方式

序列化在分布式应用的性能中扮演着重要的角色。格式化对象缓慢,或者消耗大量的字节格式化,会大大降低计算性能。通常这是在spark应用中第一件需要优化的事情。Spark的目标是在便利与性能中取得平衡,所以提供2种序列化的选择。

在默认情况下,Spark会使用Java的ObjectOutputStream框架对对象进行序列化,并且可以与任何实现java.io.Serializable的类一起工作。您还可以通过扩展java.io.Externalizable来更紧密地控制序列化的性能。Java序列化是灵活的,但通常相当慢,并且会导致许多类的大型序列化格式。

编程方式将RDD转成DF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def programmatically(spark: SparkSession): Unit = {
// RDD=>DF时需要的隐式转换
import spark.implicits._

// 创建RDD
val rdd = spark.sparkContext.textFile("spark-sql/data/info.txt")

// STEP1: RDD[String] ==> RDD[Row]
val infoRDD: RDD[Row] = rdd.map(x => {
val splits = x.split(",")
val id = splits(0).trim.toInt
val name = splits(1).trim
val age = splits(2).trim.toInt
Row(id, name, age)
})

// STEP2: schema
val schema = StructType(
StructField("id", IntegerType, true) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)

// STEP3: createDataFrame
val df = spark.createDataFrame(infoRDD, schema)
df.printSchema()
df.show()
}

反射方式将RDD转成DF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def reflection(spark: SparkSession): Unit = {
// RDD=>DF时需要的隐式转换
import spark.implicits._

// 创建RDD
val rdd = spark.sparkContext.textFile("spark-sql/data/info.txt")

// RDD[String] => case class
val infoDF = rdd.map(x => {
val splits = x.split(",")
val id = splits(0).trim.toInt
val name = splits(1).trim
val age = splits(2).trim.toInt
Info(id, name, age)
}).toDF() // 最终转成DF

infoDF.printSchema()
infoDF.show()
}

case class Info(id: Int, name: String, age: Int)