SparkSQL 做统计分析

SparkSQL 做统计分析的方法、存储格式的转换


SparkSQL做统计分析

  1. 数据
  2. 需求:求每个国家的每个域名的访问流量排名前2
  3. SQL实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    object GroupTopN {

    def main(args: Array[String]): Unit = {
    val in = "data/data.json"
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    //读取数据
    val ds = spark.read.textFile(in)

    import spark.implicits._
    //为生成需要的表格做准备
    val df: DataFrame = ds.map(row => {
    val words = row.split(",")
    (words(3), words(12), words(15).toLong)
    }).toDF("country", "domain", "traffic")

    df.createOrReplaceTempView("access")

    // 每个国家的域名流量前2
    val topNSQL="""select
    | *
    |from (
    | select
    | t.*,row_number() over(partition by country order by sum_traffic desc) r
    | from
    | (
    | select country,domain,sum(traffic) as sum_traffic from access group by country,domain
    | ) t
    | ) rt
    |where rt.r <=2 """.stripMargin
    spark.sql(topNSQL).show()
    }
    }
  4. 如果只要求traffic的降序,可以使用API直接写出来
    分组,求和,别名,降序
    1
    2
    3
    //traffic降序排序
    import org.apache.spark.sql.functions._
    df.groupBy("country","domain").agg(sum("traffic").as("sum_traffic")).sort($"sum_traffic".desc).show()
    注意看源码中案例仿写
  5. 结果展示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    +----------------+-----------------+-----------+---+
    | country| domain|sum_traffic| r|
    +----------------+-----------------+-----------+---+
    | 中国| www.bilibili.com| 24265886| 1|
    | 中国|www.ruozedata.com| 4187637| 2|
    | 利比亚| www.bilibili.com| 22816| 1|
    | 利比亚| ruoze.ke.qq.com| 15970| 2|
    | 加纳| www.bilibili.com| 138659| 1|
    | 加纳|www.ruozedata.com| 17988| 2|
    | 利比里亚| www.bilibili.com| 20593| 1|
    | 利比里亚| ruoze.ke.qq.com| 7466| 2|
    +----------------+-----------------+-----------+---+

UDF函数

  1. 数据

    1
    2
    3
    蔡三	唱,跳,rap,篮球
    李四 唱
    王五 唱,跳
  2. 需求:求出每个人的爱好的个数
    提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
    DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

  3. SQL实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    object practice04 {

    def main(args: Array[String]): Unit = {

    val in = "data/data.txt"
    val session = SparkSession.builder().master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
    session.sparkContext.setLogLevel("ERROR")
    import session.implicits._

    val ds = session.read.textFile(in)
    val df = ds.map(row => {
    val words = row.split("\t")
    (words(0), words(1))
    }).toDF("name", "fav")

    session.udf.register("length",(fav: String) => {
    fav.split(",").length
    })

    df.createOrReplaceTempView("udf_fav")

    val sql1 =
    """
    |select
    | name,
    | fav,
    | length(fav) fav_count
    |from udf_fav
    |""".stripMargin
    session.sql(sql1).show()
    }
    }
  4. 上面是使用SQL的解决方案,还可以使用API的方法

    1
    2
    3
    4
    5
    6
    //自定义的udf需要返回值
    val loveLengthUDF: UserDefinedFunction = spark.udf.register("length", (love: String) => {
    love.split(",").length
    })
    //df.select中传入UDF函数
    df.select($"name",$"love",loveLengthUDF($"love")).show()
  5. 结果展示

    1
    2
    3
    4
    5
    6
    7
    +----+--------------+---------+
    |name| fav|fav_count|
    +----+--------------+---------+
    |蔡三|唱,跳,rap,篮球| 4|
    |李四| 唱| 1|
    |王五| 唱,跳| 2|
    +----+--------------+---------+

存储格式的转换

Spark读text文件进行清洗,清洗完以后直接以我们想要的列式存储格式输出,如果按以前的方式要经过很多复杂的步骤

用Spark的时候只需要在df.write.format(“orc”).mode().save()中指定格式即可,如orc,现在就很方便了,想转成什么格式,只要format支持就ok

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object text2orc {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName(this.getClass.getSimpleName)
.config("spark.some.config.option", "some-value")
.getOrCreate()

val in = "data/people.txt"
val out = "out"

val df = spark.read.textFile(in)

import spark.implicits._

//对文本文件做处理
df.map(row => {
val words = row.split(",")
(words(0),words(1))
})
.toDF("name","age") //这一步解决了数据没有表头的问题
.write
.mode("overwrite") //save mode
.format("orc") //save format
.save(out) //save path
}
}