Fork me on GitHub

MLlib

Spark MLlib

简介

MapReduce对机器学习的算法编写的缺点:

  • 反复读写磁盘
  • 磁盘IO开销大

机器学习算法中具有大量的迭代计算,导致了MapReduce不太适合。

Spark是基于内存的计算框架,使得数据尽量不存放在磁盘上,直接在内存上进行数据的操作。

MLlib只包含能够在集群上运行良好的并行算法。

KO7EAH.png

特征化工具

  • 特征提取
  • 转化
  • 降维
  • 选择工具

KOTO74.png

实现算法

MLlib实现的算法包含:

  • 分类
  • 回归
  • 聚类
  • 协同过滤

KO7m9I.png

流水线

使用Spark SQL中的DF作为数据集,可以容纳各种数据类型。DFML Pinline用来存储源数据。DF中的列可以是:

  • 文本
  • 特征向量
  • 真实和预测标签等

转换器transformer能将一个DF转换成另一个DF,增加一个标签列。

评估器estimator指的是学习算法或在训练数据上的训练方法的抽象概念,本质上就是一个算法。

参数parameter用来进行参数的设置。

流水线构建

  1. 定义pipeline中的各个流水线阶段PipelineStage,包含转换器和评估器
  2. 转换器和评估器有序的组织起来构建PipeLine
  3. 流水线本身也是估计器。在流水线的.fit()方法运行之后,产生一个PipelineModel,变成了一个Transformer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# pyspark.ml依赖numpy:sudo pip3 install numpy

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF.Tokenizer

# 构建训练数据
training = spark.createDataFrame([
(0,"a b c d e spark", 1.0),
(1,"b d", 0.0),
(2,"spark b d e", 1.0),
(3,"hadoop mapreduce",0.0)
],["id","text","label"])

# 定义每个阶段:阶段都是评估器或者转换器
tokenizer = Tokenizer(inputCol="text",outputCol="words") # 分解器
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features")
Lr = LogisticRegression(maxIter=10,regParam=0.001)

# 合并到流水线
pipeline = Pipeline(stages=[tokenizer,hashingTF,Ir]) # 本质上是一个评估器
model = pipeline.fit(training) # 变成了一个PipelineModel,是一个转换器

# 构建测试数据
test = spark.createDataFrame([
(4," b d e spark"),
(5,"spark d"),
(6,"spark u d "),
(7,"hadoop spark")
],["id","text"])

prediction = model.transform(test)
selected = prediction.select("id","text","probability","prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(rid,text,str(prob),predi ction)

特征提取和转换

特征提取

TF-IDF;词频-逆向文件频率

  • TF:HashingTF是一个转换器;统计各个词条的词频
  • IDF:是一个评估器,在数据集上应用IDFfit方法,会产生一个IDFmodel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.ml.feature import HashingTF,IDF,Tokenizer

sentenceData = spark.createDataFrame([(0,"..."),(1,"..."),(0,"..."),(1,"..."),(1,"...")]).toDF("label","sentence") # ...代表是一个个句子

tokenizer = Tokenizer(inputCol="sentence",outputCol="words") # 指定分解器的两个属性
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

hashingTF = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=2000) # 把句子转成哈希特征向量
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words","rawFeatures").show(truncate=False)

# 调节权重:评估器
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)

# 训练过程:调节权重
rescaleData = idfModel.transform(featurizedData)
rescaleData.select("features", "label").show(truncate=False)

转换

将字符串转换成整数索引,或者在完成计算之后将证书索引还原成字符串标签。相关的转换器

  • StringIndexer

    • 将类别型的标签进行数值化,索引从0开始
    • 索引构建的频率是标签的频率,悠闲编码频率大的标签,即出现频率最高的标签为0号
    • 如果输入的是数值型的,会先转成字符型,再对其进行编码。
    1
    2
    3
    4
    5
    6
    7
    8
    from pyspark.ml.feature import StringIndexer
    df = spark.createDataFrame([(0,"a"),(1,"b"),(2,"c"),(3,"a"),(4,"a"),(5,"c")], ["id","category"])

    indexer = StringIndexer(inputCol="catefory",outputCol="categoryIndex")

    model = indexer.fit(df)
    indexed = model.transform(df)
    indexed.show()

    KX3oPf.png

  • IndexToString

    • 将标签索引重新映射成原有的字符型标签
    • 通常是和StringIndexer配合使用

    KX8TT1.png

  • VectorIndexer

    • 所有的特征被组织到一个向量中,想对其中的某个分量进行处理
    • 使用的maxCategories超参数。它是基于不同特征的数量来识别哪些特征需要被类别化。

本文标题:MLlib

发布时间:2019年11月03日 - 15:11

原始链接:http://www.renpeter.cn/2019/11/03/Spark%E7%AC%94%E8%AE%B018-MLlib.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Coffee or Tea