SparkSQL 基础
SparkSQL 基础内容
认识SparkSQL
- SparkSQL的版本变更
1
2
3
4
5
6
7
8
9
10
11
12
13
141.0以前:
Shark
1.1.x开始:
SparkSQL(只是测试性的) SQL
1.3.x:
SparkSQL(正式版本)+Dataframe
1.5.x:
SparkSQL 钨丝计划
1.6.x:
SparkSQL+DataFrame+DataSet(测试版本)
2.x.x:
SparkSQL+DataFrame+DataSet(正式版本)
SparkSQL:还有其他的优化
StructuredStreaming(DataSet) - 什么是SparkSQL?
spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。 - SparkSQL的作用
提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD - 运行原理
将 Spark SQL 转化为 RDD, 然后提交到集群执行 - 特点
- 容易整合
- 统一的数据访问方式
- 兼容 Hive
- 标准的数据连接
- spark sql
spark-sql是一个Spark专属的SQL命令行交互工具,在使用spark-sql之前要把hive-site.xml 拷贝到Spark/Conf下,spark-sql和spark-shell用法一样,但是在引入外部依赖的时候,spark-sql需要用–jars和–driver-class-path同时引入依赖才不会报错 - 持久化
在spark-sql中的持久化Table命令是: cache table xxx,清除持久化 uncache table xxx
spark-SQL中的cache和uncache都是eager的,立即执行的
考点:RDD和SparkSQL的cache有什么区别?
RDD中的cache是lazy的 spark-SQL中的cache是eager的 - 遗留问题
–files/–jars 传进去的东西清不掉
SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
特点:
- 为用户提供一个统一的切入点使用Spark 各项功能
- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame的read和write
json
- 数据的读取[DataFrameReader]select方法用于选择要输出的列,推介使用 $”col” 和 “col” 的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21object rdd2df {
def main(args: Array[String]): Unit = {
val in = "data/people.json"
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 读取json数据
val df: DataFrame = spark.read.format("json").load(in)
// 使用$"" 导入隐式转换
import spark.implicits._
// 可以使用UDF
df.select($"name",$"age").show(2,false)
// 不可以使用UDF 适合大部分场景
df.select("name","age").show()
// 不推介,写着复杂
df.select(df("name"),df("age")).show(2)
}
}
- 使用select可以选取打印的列,空值为null
- show()默认打印20条数据,可以指定条数
- truncate默认为true,截取长度,可以设置为false
select方法有三种不同的写法,fliter也有printSchema()方法可以查看数据的Schema信息1
2
3df.select(df("name"),df("age")).filter('name === "Andy").show() //推介使用
df.select(df("name"),df("age")).filter(df("name") === "Andy").show()
df.select(df("name"),df("age")).filter("name = 'Andy'").show()1
2
3
4
5df.printSchema()
------------------------------------------------
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
- truncate默认为true,截取长度,可以设置为false
- 数据的存储[DataFrameWriter]这里需要知道的一个概念是Save Modes
1
2
3val selectDf: DataFrame = df.select($"name", $"age")
// 写出json数据
selectDf.write.format("json").mode("overwrite").save(out)
Save操作可以选择使用SaveMode,它指定目标如果存在,如何处理现有数据。重要的是要认识到,这些保存模式不利用任何锁定,也不是原子性的。此外,在执行覆盖时,在写入新数据之前将删除数据。
text
- 数据的读取文本数据读进来的一行在一个字段里面,所以要使用map算子,在map中split
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37object text2df {
def main(args: Array[String]): Unit = {
val in = "data/people.txt"
val out = "out"
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
//DataFrame不能直接split,且调用map返回的是一个Dataset
val df: DataFrame = spark.read.format("text").load(in)
val mapDF: Dataset[(String, String)] = df.map(row => {
val words = row.toString().split(",")
(words(0), words(1))
})
mapDF.show()
//DataFrame转换为RDD后,再toDF,返回的是一个DataFrame
val mapRDD2DF: DataFrame = df.rdd.map(row => {
val words = row.toString().split(",")
(words(0), words(1))
}).toDF()
mapRDD2DF.show()
//使用textFile方法读取文本文件直接返回的是一个Dataset
val ds: Dataset[String] = spark.read.textFile(in)
val mapDs: Dataset[(String, String)] = ds.map(row => {
val words = row.split(",")
(words(0), words(1))
})
mapDs.show()
}
}
- 直接read.format()读进来的是DataFrame,map中不能直接split
- DataFrame通过.rdd的方式转换成RDD,map中也不能直接split
- 通过read.textFile()的方式读进来的是Dataset,map中可以split
- 数据的存储文本数据写出去的时候
1
2
3
4
5
6
7
8val df: DataFrame = spark.read.format("text").load(in)
val mapDF = df.map(row => {
val words = row.toString().split(",")
// 拼接成一列
words(0) +","+words(1)
})
mapDF.write.format("text").mode("overwrite").save(out)
- 不支持int类型,如果存在int类型,会报错,解决办法是toString,转换成字符串
- 只能作为一列输出,如果是多列,会报错,解决办法是拼接起来,组成一列
文本数据压缩输出,只要是Spark支持的压缩的格式,都可以指定1
2
3
4
5
6mapDF.write
.format("text")
// 添加压缩操作
.option("compression","gzip")
.mode("overwrite")
.save(out)
csv
- 数据的读取csv读取数据注意使用几个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21object csv2df {
def main(args: Array[String]): Unit = {
val in = "data/people.csv"
val out = "out"
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
val df: DataFrame = spark.read
.format("csv")
.option("header", "true")
.option("sep", ";")
.option("interSchema","true")
.load(in)
df.show()
}
}
- 指定表头:option(“header”, “true”)
- 指定分隔符:option(“sep”, “;”)
- 类型自动推测:option(“interSchema”,”true”)
jdbc
在操作jdbc之前要导入两个依赖,一个是mysql-jdbc,用来连接mysql,一个是config,用来解决硬编码的问题
依赖:
1 | <dependency> |
application.conf文件
1 | db.default.driver="com.mysql.jdbc.Driver" |
数据的读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36object mysql2df {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
//获取配置文件中的值,db.default开头
val conf = ConfigFactory.load()
val driver = conf.getString("db.default.driver")
val url = conf.getString("db.default.url")
val user = conf.getString("db.default.user")
val password = conf.getString("db.default.password")
val source = conf.getString("db.default.source")
val target = conf.getString("db.default.target")
val db = conf.getString("db.default.db")
//读取数据库的内容
val df: DataFrame = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", s"$db.$source") //库名.源表
.option("user", user)
.option("password", password)
.option("driver", driver)
.load()
//使用DataFrame创建临时表提供spark.sql查询
df.createOrReplaceTempView("phone_type_dist")
//spark.sql写SQL返回一个DataFrame
val sqlDF: DataFrame = spark.sql("select * from phone_type_dist where phoneSystemType = 'IOS'")
}
}使用df.createOrReplaceTempView()方法创建一个DataFrame数据生成的临时表,提供spark.sql()使用SQL操作数据,返回的也是一个DataFrame
数据的存储
1
2
3
4
5
6
7
8
9//接着上面返回的sqlDF: DataFrame
sqlDF.write
.format("jdbc")
.option("url", url)
.option("dbtable", s"$db.$target") //库名.目标表
.option("user", user)
.option("password", password)
.option("driver",driver)
.save()