SparkSQL 做统计分析
SparkSQL 做统计分析的方法、存储格式的转换
SparkSQL做统计分析
- 数据
- 需求:求每个国家的每个域名的访问流量排名前2
- SQL实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38object GroupTopN {
def main(args: Array[String]): Unit = {
val in = "data/data.json"
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
//读取数据
val ds = spark.read.textFile(in)
import spark.implicits._
//为生成需要的表格做准备
val df: DataFrame = ds.map(row => {
val words = row.split(",")
(words(3), words(12), words(15).toLong)
}).toDF("country", "domain", "traffic")
df.createOrReplaceTempView("access")
// 每个国家的域名流量前2
val topNSQL="""select
| *
|from (
| select
| t.*,row_number() over(partition by country order by sum_traffic desc) r
| from
| (
| select country,domain,sum(traffic) as sum_traffic from access group by country,domain
| ) t
| ) rt
|where rt.r <=2 """.stripMargin
spark.sql(topNSQL).show()
}
} - 如果只要求traffic的降序,可以使用API直接写出来
分组,求和,别名,降序注意看源码中案例仿写1
2
3//traffic降序排序
import org.apache.spark.sql.functions._
df.groupBy("country","domain").agg(sum("traffic").as("sum_traffic")).sort($"sum_traffic".desc).show() - 结果展示
1
2
3
4
5
6
7
8
9
10
11
12+----------------+-----------------+-----------+---+
| country| domain|sum_traffic| r|
+----------------+-----------------+-----------+---+
| 中国| www.bilibili.com| 24265886| 1|
| 中国|www.ruozedata.com| 4187637| 2|
| 利比亚| www.bilibili.com| 22816| 1|
| 利比亚| ruoze.ke.qq.com| 15970| 2|
| 加纳| www.bilibili.com| 138659| 1|
| 加纳|www.ruozedata.com| 17988| 2|
| 利比里亚| www.bilibili.com| 20593| 1|
| 利比里亚| ruoze.ke.qq.com| 7466| 2|
+----------------+-----------------+-----------+---+
UDF函数
数据
1
2
3蔡三 唱,跳,rap,篮球
李四 唱
王五 唱,跳需求:求出每个人的爱好的个数
提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDDSQL实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35object practice04 {
def main(args: Array[String]): Unit = {
val in = "data/data.txt"
val session = SparkSession.builder().master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()
session.sparkContext.setLogLevel("ERROR")
import session.implicits._
val ds = session.read.textFile(in)
val df = ds.map(row => {
val words = row.split("\t")
(words(0), words(1))
}).toDF("name", "fav")
session.udf.register("length",(fav: String) => {
fav.split(",").length
})
df.createOrReplaceTempView("udf_fav")
val sql1 =
"""
|select
| name,
| fav,
| length(fav) fav_count
|from udf_fav
|""".stripMargin
session.sql(sql1).show()
}
}上面是使用SQL的解决方案,还可以使用API的方法
1
2
3
4
5
6//自定义的udf需要返回值
val loveLengthUDF: UserDefinedFunction = spark.udf.register("length", (love: String) => {
love.split(",").length
})
//df.select中传入UDF函数
df.select($"name",$"love",loveLengthUDF($"love")).show()结果展示
1
2
3
4
5
6
7+----+--------------+---------+
|name| fav|fav_count|
+----+--------------+---------+
|蔡三|唱,跳,rap,篮球| 4|
|李四| 唱| 1|
|王五| 唱,跳| 2|
+----+--------------+---------+
存储格式的转换
Spark读text文件进行清洗,清洗完以后直接以我们想要的列式存储格式输出,如果按以前的方式要经过很多复杂的步骤
用Spark的时候只需要在df.write.format(“orc”).mode().save()中指定格式即可,如orc,现在就很方便了,想转成什么格式,只要format支持就ok
1 | object text2orc { |