Fork me on GitHub

Spark笔记12-DataFrame创建、保存

DataFrame

概述

DataFrame可以翻译成数据框,让Spark具备了处理大规模结构化数据的能力。

  • 比原有RDD转化方式更加简单,获得了更高的性能
  • 轻松实现从mysqlDF的转化,支持SQL查询
  • DF是一种以RDD为基础的分布式数据集,提供了详细的结构信息。传统的RDDJava对象集合

创建

从Spark2.0开始,spark使用全新的SparkSession接口

  • 支持不同的数据加载来源,并将数据转成DF
  • DF转成SQLContext自身中的表,然后利用SQL语句来进行操作
  • 启动进入pyspark后,pyspark 默认提供两个对象(交互式环境)
    • SparkContext:sc
    • SparkSession:spark
1
2
3
4
# 创建sparksession对象
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

读取数据

1
2
3
4
5
6
df = spark.read.text("people.txt")
df = spark.read.json("people.json")
df = spark.read.parquet("people.parquet")
df.show()

spark.read.format("text").load("people.txt")

KHIZ7t.png

KHIlcQ.png

保存

1
2
3
4
5
6
7
df.write.txt("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")

df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format("parquet").save("people.parquet")

KHTNSU.png

DF 常见操作

1
2
3
4
5
6
df = spark.read.json("people.json")
df.printSchema() # 查看各种属性信息
df.select(df["name"], df["age"]+1).show() # 筛选出两个属性
df.filter(df["age"]>20).show() # 选择数据
df.groupBy("age").count().show() # 分组再进行统计
df.sort(df["age"].desc(), df["name"].asc()).show() # 先通过age降序,再通过name升序

RDD 转成DF

  1. 利用反射机制去推断RDD模式
  2. 用编程方式去定义RDD模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 反射机制
from pyspark.sql import Row
people = spark.sparkContext.textFile("...") \ # 读取文件
.map(lambda line:line.split(",")) \ # 将读取进来的每行数据按照逗号分隔
.map(lambda p: Row(name=p[0], age=int(p[1]))) # 生成行记录
schemaPeople=spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people") # 注册成为临时表

# 编程方式
from pyspark.sql.types import *
from pyspark.sql import Row
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
lines = spark.sparkContext.textFile(
" ")

KHbAoR.png

KLPa8O.png

KHjYM6.png

KHjcsf.png

spark读取mysql数据库

安装JDBC驱动程序mysql-connector-java-5.1.4.tar.gz

1
2
3
4
5
6
7
8
9
10
11
# 存放位置
/usr/local/spark/jars

# 启动pyspark
cd /usr/local/spark
./bin/pyspark

>>> use spark;
>>> select * from student;

# 插入数据:见下图

KHvytJ.png

KHxkj0.png

本文标题:Spark笔记12-DataFrame创建、保存

发布时间:2019年11月01日 - 19:11

原始链接:http://www.renpeter.cn/2019/11/01/Spark%E7%AC%94%E8%AE%B012-DataFrame%E5%88%9B%E5%BB%BA%E3%80%81%E4%BF%9D%E5%AD%98.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea