您好,登錄后才能下訂單哦!
在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題
對于大多數語言的Hello Word示例,都有main()函數, wordcount.py的main函數,或者說調用Spark的main() 在哪里
數據的讀入,各個RDD數據如何轉換
map與flatMap的工作機制,以及區別
reduceByKey的作用
WordCount.py 的代碼如下:
from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調用Dataset和DataFrame API# SparkSession可用于創建DataFrame,將DataFrame注冊為表,在表上執行SQL,緩存表和讀取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__": # Python 常用的簡單參數傳入 if len(sys.argv) != 2: print("Usage: wordcount <file>", file=sys.stderr) exit(-1) # appName 為 Spark 應用設定一個應用名,改名會顯示在 Spark Web UI 上 # 假如SparkSession 已經存在就取得已存在的SparkSession,否則創建一個新的。 spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() # 讀取傳入的文件內容,并寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學者,可以截成兩條語句以便理解。 # map是一種轉換函數,將原來RDD的每個數據項通過map中的用戶自定義函數f映射轉變為一個新的元素。原始RDD中的數據項與新RDD中的數據項是一一對應的關系。 lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) # flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出 counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) # collect() 在驅動程序中將數據集的所有元素作為數組返回。 這在返回足夠小的數據子集的過濾器或其他操作之后通常是有用的。由于collect 是將整個RDD匯聚到一臺機子上,所以通常需要預估返回數據集的大小以免溢出。 output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當Web service的角色,程序調用Spark功能的時候需要先建立一個Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。
spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate()
既然將Spark 想象成一個Web server, 也就意味著可能用多個訪問在進行,為了便于監控管理, 對應用命名一個恰當的名稱是個好辦法。Web UI并不是本文的重點,有興趣的同學可以參考 ?Spark Application’s Web Console
在建立SparkSession之后, 就是讀入數據并寫入到Dateset中。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
為了更好的分解執行過程,是時候借助PySpark了, PySpark是python調用Spark的 API,它可以啟動一個交互式Python Shell。為了方便腳本調試,暫時切換到Linux執行
# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds) <class 'pyspark.sql.dataframe.DataFrame'>>>> print ds DataFrame[value: string]>>> lines = ds.rdd
交互式Shell的好處是可以方便的查看變量內容和類型。此刻文件a.txt已經加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數據集的實例。
RDD在內存中的結構可以參考論文, 理解RDD有兩點比較重要:
一是RDD一種只讀、只能由已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。
二是RDD的數據默認情況下存放在集群中不同節點的內存中,本身提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致數據丟了,那么RDD會自動通過自己的數據來源重新計算該partition。
為了探究RDD內部的數據內容,可以利用collect()函數, 它能夠以數組的形式,返回RDD數據集的所有元素。
>>> lines = ds.rdd>>> for i in lines.collect():... print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')
lines存儲的是Row object類型,而我們希望的是對String類型進行處理,所以需要利用map api進一步轉換RDD
>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():... print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.
為了統計每個單詞的出現頻率,需要對每個單詞分別統計,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來,并為每個詞設置一個計數器。比如 These出現次數是1, 我們期望的數據結構是['There', 1]。但是如何將包含字符串的RDD轉換成元素為類似 ['There', 1] 的RDD呢?
>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():... print i... [u'These', 1] [u'examples', 1] [u'give', 1] [u'a', 1] [u'quick', 1]
下圖簡要的講述了flatMap 和 map的轉換過程。
transfrom.png
不難看出,map api只是為所有出現的單詞初始化了計數器為1,并沒有統計相同詞,接下來這個任務由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執行reduceByKey內的算子操作,因為統計相同key是累加操作,所以可以直接add操作。
>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():... print i... (u'a', 1) (u'on', 1) (u'of', 2) (u'arbitrary', 1) (u'quick', 1) (u'the', 2) (u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23
根據a.txt 的內容,可知只有 of 和 the 兩個單詞出現了兩次,符合預期。
以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。也就是說,上面所有的RDD都是通過collect()觸發的, 那么如果將上述的transformation放入一條簡練語句中, 則展現為原始wordcount.py的書寫形式。
counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add)
而真正的action 則是由collect()完成。
output = counts.collect()
至此,已經完成了對wordcount.py的深入剖析,但是有意的忽略了一些更底層的執行過程,比如DAG, stage, 以及Driver程序。
作者:或然子
鏈接:https://www.jianshu.com/p/067907b23546
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。