Fork me on GitHub

Spark笔记16-DStream基础及操作

DStream

无状态转换操作

  • map:每个元素采用操作,返回的列表形式
  • flatmap:操作之后拍平,变成单个元素
  • filter:过滤元素
  • repartition:通过改变分区的多少,来改变DStream的并行度
  • reduce:对函数的每个进行操作,返回的是一个包含单元素RDDDStream
  • count:统计总数
  • union:合并两个DStream
  • reduceByKey:通过key分组再通过func进行聚合
  • join:K相同,V进行合并同时以元组形式表示

有状态转换操作

在有状态转换操作而言,本批次的词频统计,会在之前的词频统计的结果上进行不断的累加,最终得到的结果是所有批次的单词的总的统计结果。

滑动窗口转换操作
  1. 主要是两个参数(windowLength, slideInterval)
  • 滑动窗口的长度
  • 滑动窗口间隔
  1. 两个重要的函数

    第二个函数中增加逆向函数的作用是减小计算量

    KOACNR.png

KOAF9x.png

KOA8v8.png

KOAD2V.png

1
2
3
4
5
6
7
8
9
# 数据源终端
# 连续输入多个Hadoop和spark
cd /usr/local/spark/mycode/streaming/socket/
nc -lk 9999

# 流计算终端
# 动态显示词频统计结果
cd /usr/local/spark/mycode/streaming/socket/
/usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999
upateaStateByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官

ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
def updateFunc(new_vlaues, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)

running_counts.pprint()
ssc.start()
ssc.awaitTermination()

DStream输出操作

输出到文本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官

ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
def updateFunc(new_vlaues, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)

running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/stateful/output") # 保存到该路径下
running_counts.pprint()
ssc.start()
ssc.awaitTermination()

DStream写入到mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动mysql
service mysql start
mysql -uroot -p

# 创建表
use spark
create table wordcount(word count(20), count int(4));

# 安装pymysql
sudo apt-get update
sudo apt-get install python3-pip
pip3-v
sudo pip3 install PyMySQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])


def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)


lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word, 1))\
.updateStateByKey(updateFunc,initialRDD=initialStateRDD)
running_counts.pprint()


def dbfunc(records):
db = pymysql.connect("localhost", "root", "123456", "spark")
cursor = db.cursor()
def doinsert(p):
sql = "insert into wordcount(word, count) values('%s', '%s')" % (str(p[0]), str(p[1]))
try:
cursor.execute(sql)
db.commit()
except:
db.rollback

for item in records:
doinsert(item)

def func(rdd):
repartitionRDD = rdd.repartition(3)
repartitionRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
ssc.start()
ssc.awaitTermination()

本文标题:Spark笔记16-DStream基础及操作

发布时间:2019年11月03日 - 09:11

原始链接:http://www.renpeter.cn/2019/11/03/Spark%E7%AC%94%E8%AE%B016-DStream%E5%9F%BA%E7%A1%80%E5%8F%8A%E6%93%8D%E4%BD%9C.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea