SparkSQL 基础

SparkSQL 基础内容


认识SparkSQL

  1. SparkSQL的版本变更
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    1.0以前:
    Shark
    1.1.x开始:
    SparkSQL(只是测试性的) SQL
    1.3.x:
    SparkSQL(正式版本)+Dataframe
    1.5.x:
    SparkSQL 钨丝计划
    1.6.x:
    SparkSQL+DataFrame+DataSet(测试版本)
    2.x.x:
    SparkSQL+DataFrame+DataSet(正式版本)
    SparkSQL:还有其他的优化
    StructuredStreaming(DataSet)
  2. 什么是SparkSQL?
    spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
  3. SparkSQL的作用
    提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
    DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD
  4. 运行原理
    将 Spark SQL 转化为 RDD, 然后提交到集群执行
  5. 特点
  • 容易整合
  • 统一的数据访问方式
  • 兼容 Hive
  • 标准的数据连接
  1. spark sql
    spark-sql是一个Spark专属的SQL命令行交互工具,在使用spark-sql之前要把hive-site.xml 拷贝到Spark/Conf下,spark-sql和spark-shell用法一样,但是在引入外部依赖的时候,spark-sql需要用–jars和–driver-class-path同时引入依赖才不会报错
  2. 持久化
    在spark-sql中的持久化Table命令是: cache table xxx,清除持久化 uncache table xxx
    spark-SQL中的cache和uncache都是eager的,立即执行的
    考点:RDD和SparkSQL的cache有什么区别?
    RDD中的cache是lazy的 spark-SQL中的cache是eager的
  3. 遗留问题
    –files/–jars 传进去的东西清不掉

SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

  1. 为用户提供一个统一的切入点使用Spark 各项功能
  2. 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
  3. 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
  4. 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

RDD和DataFrame的存储内容比较

DataFrame的read和write

json
  1. 数据的读取[DataFrameReader]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    object rdd2df {
    def main(args: Array[String]): Unit = {
    val in = "data/people.json"
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
    // 读取json数据
    val df: DataFrame = spark.read.format("json").load(in)
    // 使用$"" 导入隐式转换
    import spark.implicits._
    // 可以使用UDF
    df.select($"name",$"age").show(2,false)
    // 不可以使用UDF 适合大部分场景
    df.select("name","age").show()
    // 不推介,写着复杂
    df.select(df("name"),df("age")).show(2)
    }
    }
    select方法用于选择要输出的列,推介使用 $”col” 和 “col” 的方法
    1. 使用select可以选取打印的列,空值为null
    1. show()默认打印20条数据,可以指定条数
    1. truncate默认为true,截取长度,可以设置为false
      select方法有三种不同的写法,fliter也有
      1
      2
      3
      df.select(df("name"),df("age")).filter('name === "Andy").show()	//推介使用
      df.select(df("name"),df("age")).filter(df("name") === "Andy").show()
      df.select(df("name"),df("age")).filter("name = 'Andy'").show()
      printSchema()方法可以查看数据的Schema信息
      1
      2
      3
      4
      5
      df.printSchema()
      ------------------------------------------------
      root
      |-- age: long (nullable = true)
      |-- name: string (nullable = true)
  1. 数据的存储[DataFrameWriter]
    1
    2
    3
    val selectDf: DataFrame = df.select($"name", $"age")
    // 写出json数据
    selectDf.write.format("json").mode("overwrite").save(out)
    这里需要知道的一个概念是Save Modes
    Save操作可以选择使用SaveMode,它指定目标如果存在,如何处理现有数据。重要的是要认识到,这些保存模式不利用任何锁定,也不是原子性的。此外,在执行覆盖时,在写入新数据之前将删除数据。
    SaveMode
text
  1. 数据的读取
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    object text2df {
    def main(args: Array[String]): Unit = {
    val in = "data/people.txt"
    val out = "out"
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    import spark.implicits._

    //DataFrame不能直接split,且调用map返回的是一个Dataset
    val df: DataFrame = spark.read.format("text").load(in)
    val mapDF: Dataset[(String, String)] = df.map(row => {
    val words = row.toString().split(",")
    (words(0), words(1))
    })
    mapDF.show()

    //DataFrame转换为RDD后,再toDF,返回的是一个DataFrame
    val mapRDD2DF: DataFrame = df.rdd.map(row => {
    val words = row.toString().split(",")
    (words(0), words(1))
    }).toDF()
    mapRDD2DF.show()

    //使用textFile方法读取文本文件直接返回的是一个Dataset
    val ds: Dataset[String] = spark.read.textFile(in)
    val mapDs: Dataset[(String, String)] = ds.map(row => {
    val words = row.split(",")
    (words(0), words(1))
    })
    mapDs.show()
    }
    }
    文本数据读进来的一行在一个字段里面,所以要使用map算子,在map中split
  • 直接read.format()读进来的是DataFrame,map中不能直接split
  • DataFrame通过.rdd的方式转换成RDD,map中也不能直接split
  • 通过read.textFile()的方式读进来的是Dataset,map中可以split
  1. 数据的存储
    1
    2
    3
    4
    5
    6
    7
    8
    val df: DataFrame = spark.read.format("text").load(in)
    val mapDF = df.map(row => {
    val words = row.toString().split(",")
    // 拼接成一列
    words(0) +","+words(1)
    })

    mapDF.write.format("text").mode("overwrite").save(out)
    文本数据写出去的时候
  • 不支持int类型,如果存在int类型,会报错,解决办法是toString,转换成字符串
  • 只能作为一列输出,如果是多列,会报错,解决办法是拼接起来,组成一列
    文本数据压缩输出,只要是Spark支持的压缩的格式,都可以指定
    1
    2
    3
    4
    5
    6
    mapDF.write
    .format("text")
    // 添加压缩操作
    .option("compression","gzip")
    .mode("overwrite")
    .save(out)
csv
  1. 数据的读取
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    object csv2df {
    def main(args: Array[String]): Unit = {
    val in = "data/people.csv"
    val out = "out"

    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    val df: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("sep", ";")
    .option("interSchema","true")
    .load(in)
    df.show()
    }
    }
    csv读取数据注意使用几个参数
  • 指定表头:option(“header”, “true”)
  • 指定分隔符:option(“sep”, “;”)
  • 类型自动推测:option(“interSchema”,”true”)
jdbc

在操作jdbc之前要导入两个依赖,一个是mysql-jdbc,用来连接mysql,一个是config,用来解决硬编码的问题

依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

application.conf文件

1
2
3
4
5
6
7
8
9
10
11
12
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop/listener?characterEncoding=utf-8&useSSL=false"
db.default.user="root"
db.default.password="root"
db.default.source="dws_ad_phone_type_dist"
db.default.target="dws_ad_phone_type_dist_1"
db.default.db="access_dw"

# Connection Pool settings
db.default.poolInitialSize=10
db.default.poolMaxSize=20
db.default.connectionTimeoutMillis=1000
  1. 数据的读取

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    object mysql2df {

    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName(this.getClass.getSimpleName)
    .config("spark.some.config.option", "some-value")
    .getOrCreate()

    //获取配置文件中的值,db.default开头
    val conf = ConfigFactory.load()
    val driver = conf.getString("db.default.driver")
    val url = conf.getString("db.default.url")
    val user = conf.getString("db.default.user")
    val password = conf.getString("db.default.password")
    val source = conf.getString("db.default.source")
    val target = conf.getString("db.default.target")
    val db = conf.getString("db.default.db")

    //读取数据库的内容
    val df: DataFrame = spark.read
    .format("jdbc")
    .option("url", url)
    .option("dbtable", s"$db.$source") //库名.源表
    .option("user", user)
    .option("password", password)
    .option("driver", driver)
    .load()
    //使用DataFrame创建临时表提供spark.sql查询
    df.createOrReplaceTempView("phone_type_dist")

    //spark.sql写SQL返回一个DataFrame
    val sqlDF: DataFrame = spark.sql("select * from phone_type_dist where phoneSystemType = 'IOS'")
    }
    }

    使用df.createOrReplaceTempView()方法创建一个DataFrame数据生成的临时表,提供spark.sql()使用SQL操作数据,返回的也是一个DataFrame

  2. 数据的存储

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //接着上面返回的sqlDF: DataFrame
    sqlDF.write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", s"$db.$target") //库名.目标表
    .option("user", user)
    .option("password", password)
    .option("driver",driver)
    .save()