您好,登錄后才能下訂單哦!
MLlib 提供的API可以通過Pipelines將多個復雜的機器學習算法結合成單個pipeline或者單個工作流。這個概念和scikit-learn里的概念類似,根據官方的說法是,此抽象概念的設計靈感來自于scikit-learn。
· DataFrame
:通過Spark SQL 組件里的DataFrame作為機器學習的數據集。支持多種數據類型.比如 DataFrame
可以將文本,數據庫等外部數據源劃分為不同的列,包含特征向量, 特征值等。
· Transformer
: 一個 Transformer
可以將一個DataFrame
轉換成另一個DataFrame
. 比如, 一個機器學習模型可以將帶有特征值的DataFrame轉換為一個帶有模型預測結果數據的DataFrame.
· Estimator
:通過 DataFrame
數據集進行訓練
產生一個機器學習模型的算法。
· Pipeline
:聯合多個 Transformer
和 Estimator
構成一個機器學習工作流。
· Parameter
: 所有Transformer
和 Estimator
指定參數的共享API。
DataFrame里廣泛運用的數據結構,可以包含向量,文本,圖片,以及結構化數據。DataFrame通過Spark SQL支持多種數據源。
工作流程如圖所示:
機器學習中Pipleline流程圖
正如圖中所示,Pipeline有三個階段,每個階段要么是Transformer ,要么就是Estimator,這些階段按照一定的順序執行,執行的過程中,通過圓柱體代表的DataFrame類型的Raw text產生一個新的Words(DataFrame類型),最后建立了一個LogisticRegressionModel。圖中的Tokenizer,HashingTF都是Transformer,而LogisticRegressionModel是Estimator 。
在Transformer 階段,主要調用transform()方法進行計算。
在Estimator
階段,主要調用
fit()
方法進行計算。
DAG Pipelines:多個階段形成一個pipeline,同理,DAG Pipelines就是多個pipeline組成的一個有向無環圖。
運行時檢查:數據結構DataFrame中可以有各種各樣的數據,但是在編譯的時候不會檢查數據的數據類型,而是在運行的時候才根據DataFrame的Schema來檢查數據類型。
唯一ID標識:Pipeline的每一個階段(stage)都通過id來進行唯一的標識,同一個相同的實列,比如HashingTF不會插入到同一個Pipeline倆次,因為每一個stage都有自身的唯一的ID來進行標識。
代碼案例:
importorg.apache.spark.ml.classification.LogisticRegression
importorg.apache.spark.ml.linalg.{Vector,Vectors}
importorg.apache.spark.ml.param.ParamMap
importorg.apache.spark.sql.Row
// Prepare training data from a list of (label, features)tuples.
valtraining=spark.createDataFrame(Seq(
(1.0,Vectors.dense(0.0,1.1,0.1)),
(0.0,Vectors.dense(2.0,1.0,-1.0)),
(0.0,Vectors.dense(2.0,1.3,1.0)),
(1.0,Vectors.dense(0.0,1.2,-0.5))
)).toDF("label","features")
// Create a LogisticRegression instance. This instance is anEstimator.
vallr=newLogisticRegression()
// Print out the parameters, documentation, and any defaultvalues.
println("LogisticRegressionparameters:\n"+lr.explainParams()+"\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parametersstored in lr.
valmodel1=lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced byan Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where namesare unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit usingparameters: "+model1.parent.extractParamMap)
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
valparamMap=ParamMap(lr.maxIter->20)
.put(lr.maxIter,30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam->0.1,lr.threshold->0.55) // Specify multiple Params.
// One can also combine ParamMaps.
valparamMap2=ParamMap(lr.probabilityCol->"myProbability") // Change output column name.
valparamMapCombined=paramMap++paramMap2
// Now learn a new model using the paramMapCombinedparameters.
// paramMapCombined overrides all parameters set earlier vialr.set* methods.
valmodel2=lr.fit(training,paramMapCombined)
println("Model 2 was fit usingparameters: "+model2.parent.extractParamMap)
// Prepare test data.
valtest=spark.createDataFrame(Seq(
(1.0,Vectors.dense(-1.0,1.5,1.3)),
(0.0,Vectors.dense(3.0,2.0,-0.1)),
(1.0,Vectors.dense(0.0,2.2,-1.5))
)).toDF("label","features")
// Make predictions on test data using theTransformer.transform() method.
// LogisticRegression.transform will only use the 'features'column.
// Note that model2.transform() outputs a 'myProbability'column instead of the usual
// 'probability' column since we renamed thelr.probabilityCol parameter previously.
model2.transform(test)
.select("features","label","myProbability","prediction")
.collect()
.foreach{caseRow(features:Vector,label:Double,prob:Vector,prediction:Double)=>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
Pipeline單獨的案例代碼
importorg.apache.spark.ml.{Pipeline,PipelineModel}
importorg.apache.spark.ml.classification.LogisticRegression
importorg.apache.spark.ml.feature.{HashingTF,Tokenizer}
importorg.apache.spark.ml.linalg.Vector
importorg.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L,"a b c d e spark",1.0),
(1L,"b d",0.0),
(2L,"spark f g h",1.0),
(3L,"hadoop mapreduce",0.0)
)).toDF("id","text","label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer =newTokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF =newHashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr =newLogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline =newPipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel =PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L,"spark i j k"),
(5L,"l m n"),
(6L,"spark hadoop spark"),
(7L,"apache hadoop")
)).toDF("id","text")
// Make predictions on test documents.
model.transform(test)
.select("id","text","probability","prediction")
.collect()
.foreach{caseRow(id:Long, text:String, prob:Vector, prediction:Double)=>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。