Fork me on GitHub

Spark笔记17-Structured Streaming

Structured Streaming

概述

Structured Streaming将实时数据视为一张正在不断添加数据的表。

可以把流计算等同于在一个静态表上的批处理查询,进行增量运算。

在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并且更新结果。

KOhJ4P.png

KO5AW6.png

两种处理模式

  • 1.微批处理模式(默认)

    在微批处理之前,将待处理数据的偏移量写入预写日志中。 防止故障宕机等造成数据的丢失,无法恢复。

    • 定期检查流数据源
    • 对上一批次结束后到达的新数据进行批量查询
    • 由于需要写日志,造成延迟。最快响应时间为100毫秒
  • 2.持续处理模式

    • 毫秒级响应
    • 不再根据触发器来周期性启动任务
    • 启动一系列的连续的读取、处理等长时间运行的任务
    • 异步写日志,不需要等待

    KO5GSf.png

Spark Streaming 和Structured Streaming

类别 Spark Structured
数据源 DStream,本质上是RDD DF数据框
处理数据 只能处理静态数据 能够处理数据流
实时性 秒级响应 毫秒级响应

编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# StructuredNetWordCount.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

# 创建SparkSession对象
if __name__ == "__main__":
spark = SparkSession.builder.appName("StructuredNetworkCount").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# 创建输入数据源
lines = spark.readStream.formaat("socket").option("host", "localhost").option("port", 9999).load()

# 定义流计算过程
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordsCounts = words.groupBy("word").count()

# 启动流计算并且输出结果
query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds")
.start() # complete 表示输出模式

query.awaitTermination()

启动执行

1
2
3
4
5
6
7
8
9
10
# 启动HDFS
cd /usr/local/hadoop
sbin/start-dfs.sh

# 新建数据源终端
nc -lk 9999 # 启动服务端;需要输入语句

# 新建流计算终端:客户端
cd /usr/local/spark/mycode/structuredstreaming/
/usr/local/spark/bin/spark-submit StructuredNetWordCount.py

输入源

  • file源(文件源)

  • Kafka源

  • socket源

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # spark_ss_kafka_producer.py

    import string
    import random
    import time

    from kafka import KafkaProduce
    if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers =['localhost:9092'])

    while True:
    s2 = (random.choice(string.ascii_lowercase) for _ in range(2)

    word = ''.join(s2)
    value = bytearray(word,'utf-8')

    producer.send("wordcount-topic", value=vlaue).get(timecount=10) # 选定主题发送内容value

    time.sleep(0.1)
    1
    2
    3
    4
    sudo agt-get install pip3
    sudo pip3 install kafka-python
    cd /usr/local/spark/mycode/structuredstreaming/kafka/
    python3 spark_ss_kafka_producer.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # spark_ss_kafka_consumer.py

    from pyspark.sql import SparkSession

    if __name__="__main__":
    spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCount()
    spark.sparkContext.setLogLevel("WARN")

    # 读取数据源
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "local:9092").option("subcribe", "wordcount-topic").load().selectExpr("CAST(value ASSTRING)")
    wordCounts = lines.groupBy("values").count()

    KOR0BR.png

输出

  1. 启动流计算

    DF或者Dataset的.writeStream()方法将会返回DataStreamWriter接口,接口通过.start()真正启动流计算,接口的主要参数是:

    • format:接收者类型
    • outputMode:输出模式
    • queryName:查询的名称,可选,用于标识查询的唯一名称
    • trigger:触发间隔,可选
  2. 三种输出模式

  • append
  • complete
  • update

KOWNIP.png

  1. 输出接收器

    系统内置的接收起包含:

    • file接收器
    • Kafka接收器
    • Foreach接收器
    • Console接收器
    • Memory接收器

本文标题:Spark笔记17-Structured Streaming

发布时间:2019年11月03日 - 11:11

原始链接:http://www.renpeter.cn/2019/11/03/Spark%E7%AC%94%E8%AE%B017-Structured-Streaming.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea