SparkStreaming 基础

目录

  1. SparkStreaming简介
  2. SparkStreaming的内部结构
  3. StreamingContext对象
  4. 离散流(DStream)
  5. IDEA开发Spark Streaming

SparkStreaming简介

Spark Streaming是核心Spark API的扩展,实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。
最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。
而且还可以在数据流上应用Spark提供的机器学习和图处理算法。

what is SparkStreaming

SparkStreaming的内部结构

在内部,它的工作原理如下。
Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。

streaming batch process

Spark Streaming将连续的数据流抽象为discretizedstream(DStream)。在内部,DStream由一个RDD序列表示。

StreamingContext对象

初始化StreamingContext:
方式一,从SparkConf对象中创建:

1
2
3
4
//创建一个Context对象:StreamingContext
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//指定批处理的时间间隔
val ssc = new StreamingContext(conf, Seconds(5))

方式二,从现有的SparkContext实例中创建:

1
val ssc = new StreamingContext(sc, Seconds(1))

说明:

  • appName参数是应用程序在集群UI上显示的名称。
  • master是Spark,Mesos或YARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。
  • 当在集群上运行程序时,不需要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。但是,对于本地测试和单元测试,您可以通过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。
  • StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。
  • 批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。

注意:

  • 一旦一个StreamingContext开始运作,就不能设置或添加新的流计算。
  • 一旦一个上下文被停止,它将无法重新启动。
  • 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。
  • StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。
  • 只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。

离散流(DStream)

DiscretizedStream(DStream) 是Spark Streaming对流式数据的基本抽象。
它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。
在内部,DStream由一系列连续的RDD表示,如下图:

streaming dstream 1

我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为lines的 DStream中的每个RDD上,以生成words DStream的RDD。如下图所示:
streaming dstream 2

但是DStream和RDD也有区别,下面画图说明:
streaming dstream 3

streaming dstream 4

IDEA开发SparkStreaming

要编写自己的Spark流程序,必须将以下依赖项添加到Maven项目中。

1
2
3
4
5
6
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
SocketFile简单的单词计数
  1. 实现代scala代码逻辑
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    def main(args: Array[String]): Unit = {

    // 拿到StreamingContext对象
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))

    dispose(ssc)

    //开启StreamingContext
    ssc.start()
    ssc.awaitTermination()
    }

    private def dispose(ssc: StreamingContext) = {
    //输入记录
    val lines = ssc.socketTextStream("hadoop", 9100)

    //逻辑处理
    val words = text.flatMap(_.split(" "))
    val pair = words.map(x => (x, 1))
    val result = pair.reduceByKey(_ + _)

    //输出记录
    result.print()
    }
  2. 使用nc发送消息
    1
    2
    3
    $ nc -lk 9100

    hello world
  3. 客户端接收消息
    1
    2
    3
    4
    5
    6
    7
    ...
    -------------------------------------------
    Time: 1357008430000 ms
    -------------------------------------------
    (hello,1)
    (world,1)
    ...

DStreams 是表示从源端接收的输入数据的数据流。
在这个简单的示例中,行是一个输入DStream,因为它表示从netcat服务器接收到的数据流。
每个输入DStream(本节后面讨论的文件流除外)都与接收方(Scala doc、Java doc)对象相关联,接收方接收来自源的数据并将其存储在Spark内存中进行处理。

注意:
Spark流应用程序需要分配足够的Core来处理接收到的数据,以及运行接收方。
设置core的数量要大于Receivers的数量。

Checkpoint维护State

什么是updateStateByKey?

  • updateStateByKey(func)可以返回一个新“state”的DStream,其中通过对键的前一个状态和键的新值应用给定的函数来更新每个键的状态。
    这可以用来维护每个键的任意状态数据。

什么是Checkpoint?

  • Checkpoint可以通过在一个容错的、可靠的文件系统中设置一个目录来启用,Checkpoint信息将被保存到这个目录中。
    这是通过使用streamingContext.checkpoint(checkpointDirectory)实现的。
  1. 下面案例也是单词计数,只不过需求变成了求当天到现在为止的单词计数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = new StreamingContext(conf, Seconds(5))

    dispose(ssc)

    ssc.start()
    ssc.awaitTermination()
    }

    // 处理逻辑
    private def dispose(ssc: StreamingContext) = {
    val lines = ssc.socketTextStream("hadoop", 9100)

    // 设置checkpoint目录,保存offset
    ssc.checkpoint("./chk")
    // updateStateByKey:维护记录的state
    lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)
    .print()
    }

    // 实现对新值和旧值的累加
    def updateFunction(newValues: Seq[Int], oldValues: Option[Int]): Option[Int] = {
    val curr = newValues.sum
    val old = oldValues.getOrElse(0)
    val count = curr + old
    Some(count)
    }
  2. 使用nc发送消息

    1
    2
    3
    4
    $ nc -lk 9100

    a a a b b c
    a a a b b c
  3. 客户端接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    -------------------------------------------
    Time: 1587439170000 ms
    -------------------------------------------
    (b,2)
    (a,3)
    (c,1)

    -------------------------------------------
    Time: 1587439175000 ms
    -------------------------------------------
    (b,4)
    (a,6)
    (c,2)

上面的代码一直运行,结果可以一直累加,但是代码一旦停止运行,再次运行时,结果会不会接着上一次进行计算,上一次的计算结果丢失了,主要原因上每次程序运行都会初始化一个程序入口,而2次运行的程序入口不是同一个入口,所以会导致第一次计算的结果丢失。

第一次的运算结果状态保存在Driver里面,所以我们如果想用上一次的计算结果,我们需要将上一次的Driver里面的运行结果状态取出来,而上面的代码有一个checkpoint方法,它会把上一次Driver里面的运算结果状态保存在checkpoint的目录里面,我们在第二次启动程序时,从checkpoint里面取出上一次的运行结果状态,把这次的Driver状态恢复成和上一次Driver一样的状态。

Checkpoint维护State HA

以下代码参考官网
如果想让应用程序从驱动程序故障中恢复,我们应该重写代码,让它具备下面的功能

  • 当程序第一次启动时,它将创建一个新的StreamingContext,设置所有的流,然后调用start()。
  • 当程序在失败后重新启动时,它将从Checkpoint目录中的Checkpoint数据重新创建一个StreamingContext。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
val checkpoint = "./chk_v2"
def main(args: Array[String]): Unit = {

// 拿到StreamingContext
val ssc = StreamingContext.getOrCreate(checkpoint, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
}

// 创建StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(conf,Seconds(50000)) // new context

dispose(ssc)

ssc.checkpoint(checkpoint) // set checkpoint directory
ssc
}

// 处理具题的业务逻辑
private def dispose(ssc: StreamingContext) = {
val lines = ssc.socketTextStream("hadoop", 9100) // create DStreams

lines
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updateFunction)
.print()
}

// 更新state
def updateFunction(newValues: Seq[Int], oldValues: Option[Int]): Option[Int] = {
val curr = newValues.sum
val old = oldValues.getOrElse(0)
val count = curr + old
Some(count)
}
源码中维护State的方法

在阅读源码中的Example模块下Streaming下的StatefulNetworkWordCount object时,发现了一种维护State的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 保存checkpoint
val checkpoint = "./chk_v3"

def main(args: Array[String]): Unit = {
// 拿到 StreamingContext
val ssc = StreamingContext.getOrCreate(checkpoint, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
}

// 创建 StreamingContext
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint(checkpoint)

// 对记录做累加操作
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
if(state.isTimingOut()){
println("超时3秒没拿到数据")
}else{
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
}

// 逻辑处理
val lines = ssc.socketTextStream("hadoop", 9100)
lines
.flatMap(_.split(" "))
.map((_,1))
.mapWithState(StateSpec.function(mappingFunc)
.timeout(Seconds(3))
).print()

ssc
}