Fork me on GitHub

Spark笔记10-demo

案例

根据几个实际的应用案例来学会sparkmap、filter、take等函数的使用

案例1

找出TOP5的值

  • filter(func):筛选出符合条件的数据
  • map(func):对传入数据执行func操作
  • sortByKey():只能对键值对进行操作,默认是升序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file") # 得到RDD元素,每个RDD元素都是文本文件中的一行数据(可能存在空行)

res1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4)) # 字符串后面的空格去掉,并且保证长度是4
res2 = res1.map(lambda x:x.split(",")[2]) # 将列表中的元素分割,取出第3个元素,仍是字符串
res3 = res2.map(lambda x:(int(x), "")) # 将字符串转成int类型,并且变成key-value形式(50, ""),value都是空格
res4 = res3.repartition(1)
res5 = res4.sortByKey(False) # sortByKey的对象必须是键值对;按照key进行降序排列,value不动
res6 = res5.map(lambda x:x[0]) # 取出第一个元素并通过take取出前5个
res7 = res6.take(5)
for a in res7:
print(a)

文件全局排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark import SparkConf, SparkContext

index = 0

def getindex():
global index
index += 1
return index

def main():
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
index = 0
res1 = lines.filter(lambda line:(len(line.strip()) > 0 ))
res2 = res1.map(lambda x: (int(x.strip()),""))
res3 = res2.repartition(1)
res4 = res3.sortByKey(True) # 升序排列
res5 = res4.map(lambda x:x[0])
res6 = res5.map(lambda x:(getindex(),x))
res6.foreach(print)
res6.saveAsFile("file:///usr/local/spark/code/rdd/filesort/result") # 结果写进目录中-

Ko8rGR.png

二次排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
def __int__(self,k): # 构造函数
self.column1 = k[0]
self.column2 = k[1]

def __gt__(self,other): # 重写比较函数
if other.column1 = self.column1: # 如果第一个元素相等,表第二个
return gt(self.column2, other.column2)
else:
return gt(self.column1, other.column1) # 否则直接比较第一个


def main():
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
rdd1 = sc.textFile("file:///usr/local/spark/rdd/filesort/file.txt")
rdd2 = rdd1.filter(lambda x:(len(x.strip()) > 0 )) # 空行消掉
rdd3 = rdd2.map(lambda x: (int(x.split(" ")[0]), int(x.split(" ")[1])), x)
rdd4 = rdd3.map(lambda x:(SecondarySortKey(x[0]), x[1]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x: x[1])

KoRYqS.png

KoWFoQ.png

本文标题:Spark笔记10-demo

发布时间:2019年10月31日 - 20:10

原始链接:http://www.renpeter.cn/2019/10/31/Spark%E7%AC%94%E8%AE%B010-demo.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea