您好,登錄后才能下訂單哦!
學習spark任何技術之前,請先正確理解spark,可以參考:正確理解spark
以下對RDD的三種創建方式、單類型RDD基本的transformation api、采樣Api以及pipe操作進行了python api方面的闡述
一、RDD的三種創建方式
從穩定的文件存儲系統中創建RDD,比如local fileSystem或者hdfs等,如下:
""" 創建RDD的方法: 1: 從一個穩定的存儲系統中,比如hdfs文件, 或者本地文件系統 """ text_file_rdd = sc.textFile("file:////Users/tangweiqun/spark-course/word.txt") print "text_file_rdd = {0}".format(",".join(text_file_rdd.collect()))
2. 可以經過transformation api從一個已經存在的RDD上創建一個新的RDD,以下是map這個轉換api
""" 2: 從一個已經存在的RDD中, 即RDD的transformation api """ map_rdd = text_file_rdd.map(lambda line: "{0}-{1}".format(line, "test")) print "map_rdd = {0}".format(",".join(map_rdd.collect()))
3. 從一個內存中的列表數據創建一個RDD,可以指定RDD的分區數,如果不指定的話,則取所有Executor的所有cores數量
""" 3: 從一個已經存在于內存中的列表, 可以指定分區,如果不指定的話分區數為所有executor的cores數,下面的api時指定了2個分區 """ parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2) print "parallelize_rdd = {0}".format(parallelize_rdd.glom().collect())
注:對于第三種情況,scala中還提供了makeRDD api,這個api可以指定創建RDD每一個分區所在的機器,這個api的原理詳見spark core RDD scala api中
二、單類型RDD基本的transformation api
先基于內存中的數據創建一個RDD
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
map操作,表示對parallelize_rdd的每一個元素應用我們自定義的函數接口,如下是將每一個元素加1:
map_rdd = parallelize_rdd.map(lambda x: x + 1) """ 結果:[[2, 3], [4, 4, 5]] """ print "map_rdd = {0}".format(map_rdd.glom().collect())
需要注意的是,map操作可以返回與RDD不同類型的數據,如下,返回一個String類型對象:
map_string_rdd = parallelize_rdd.map(lambda x: "{0}-{1}".format(x, "test")) """ 結果:[['1-test', '2-test'], ['3-test', '3-test', '4-test']] """ print "map_string_rdd = {0}".format(map_string_rdd.glom().collect())
2. flatMap操作,對parallelize_rdd的每一個元素應用我們自定義的lambda函數,這個函數的輸出是一個數據列表,flatMap會對這些輸出的數據列表進行展平
flatmap_rdd = parallelize_rdd.flatMap(lambda x: range(x)) """ 結果:[[0, 0, 1], [0, 1, 2, 0, 1, 2, 0, 1, 2, 3]] """ print "flatmap_rdd = {0}".format(flatmap_rdd.glom().collect())
3. filter操作,對parallelize_rdd的每一個元素應用我們自定義的過濾函數,過濾掉我們不需要的元素,如下,過濾掉不等于1的元素:
filter_rdd = parallelize_rdd.filter(lambda x: x != 1) """ 結果:[[2], [3, 3, 4]] """ print "filter_rdd = {0}".format(filter_rdd.glom().collect())
4. glom操作,查看parallelize_rdd每一個分區對應的元素數據
glomRDD = parallelize_rdd.glom() """ 結果:[[1, 2], [3, 3, 4]] 說明parallelize_rdd有兩個分區,第一個分區中有數據1和2,第二個分區中有數據3,3和4 """ print "glomRDD = {0}".format(glomRDD.collect())
5. mapPartitions操作,對parallelize_rdd的每一個分區的數據應用我們自定義的函數接口方法,假設我們需要為每一個元素加上一個初始值,而這個初始值的獲取又是非常耗時的,這個時候用mapPartitions會有非常大的優勢,如下:
//這是一個初始值獲取的方法,是一個比較耗時的方法 def get_init_number(source): print "get init number from {0}, may be take much time........".format(source) time.sleep(1) return 1 def map_partition_func(iterator): """ 每一個分區獲取一次初始值,integerJavaRDD有兩個分區,那么會調用兩次getInitNumber方法 所以對應需要初始化的比較耗時的操作,比如初始化數據庫的連接等,一般都是用mapPartitions來為對每一個分區初始化一次,而不要去使用map操作 :param iterator: :return: """ init_number = get_init_number("map_partition_func") yield map(lambda x : x + init_number, iterator) map_partition_rdd = parallelize_rdd.mapPartitions(map_partition_func) """ 結果:[[[2, 3]], [[4, 4, 5]]] """ print "map_partition_rdd = {0}".format(map_partition_rdd.glom().collect()) def map_func(x): """ 遍歷每一個元素的時候都會去獲取初始值,這個integerJavaRDD含有5個元素,那么這個getInitNumber方法會被調用4次,嚴重的影響了時間,不如mapPartitions性能好 :param x: :return: """ init_number = get_init_number("map_func") return x + init_number map_rdd_init_number = parallelize_rdd.map(map_func) """ 結果:[[2, 3], [4, 4, 5]] """ print "map_rdd_init_number = {0}".format(map_rdd_init_number.glom().collect())
6. mapPartitionsWithIndex操作,對parallelize_rdd的每一個分區的數據應用我們自定義的函數接口方法,在應用函數接口方法的時候帶上了分區信息,即知道你當前處理的是第幾個分區的數據
def map_partition_with_index_func(partition_index, iterator): yield (partition_index, sum(iterator)) map_partition_with_index_rdd = parallelize_rdd.mapPartitionsWithIndex(map_partition_with_index_func) """ 結果:[[(0, 3)], [(1, 10)]] """ print "map_partition_with_index_rdd = {0}".format(map_partition_with_index_rdd.glom().collect())
三、采樣Api
先基于內存中的數據創建一個RDD
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize([1, 2, 3, 3, 4], 2)
sample
""" 第一個參數為withReplacement 如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實現 如果withReplacement=false的話表示無放回的抽樣,采用伯努利抽樣算法實現 第二個參數為:fraction,表示每一個元素被抽取為樣本的概率,并不是表示需要抽取的數據量的因子 比如從100個數據中抽樣,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20個數據, 而是表示100個元素的被抽取為樣本概率為0.2;樣本的大小并不是固定的,而是服從二項分布 當withReplacement=true的時候fraction>=0 當withReplacement=false的時候 0 < fraction < 1 第三個參數為:reed表示生成隨機數的種子,即根據這個reed為rdd的每一個分區生成一個隨機種子 """ sample_rdd = parallelize_rdd.sample(False, 0.5, 100) """ 結果:[[1], [3, 4]] """ print "sample_rdd = {0}".format(sample_rdd.glom().collect())
2. randomSplit
""" //按照權重對RDD進行隨機抽樣切分,有幾個權重就切分成幾個RDD //隨機抽樣采用伯努利抽樣算法實現, 以下是有兩個權重,則會切成兩個RDD """ split_rdds = parallelize_rdd.randomSplit([0.2, 0.8]) print len(split_rdds) """[[], [3, 4]]""" print "split_rdds[0] = {0}".format(split_rdds[0].glom().collect()) """[[1, 2], [3]]""" print "split_rdds[1] = {0}".format(split_rdds[1].glom().collect())
3. takeSample
""" //隨機抽樣指定數量的樣本數據 //第一個參數為withReplacement //如果withReplacement=true的話表示有放回的抽樣,采用泊松抽樣算法實現 //如果withReplacement=false的話表示無放回的抽樣,采用伯努利抽樣算法實現 //第二個參數指定多少,則返回多少個樣本數 """ """隨機抽樣指定數量的樣本數據 結果:[1] """ print parallelize_rdd.takeSample(False, 1)
4. 分層采樣,對key-value類型的RDD進行采樣
"""創建一個key value類型的RDD""" pair_rdd = sc.parallelize([('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]) sampleByKey_rdd = pair_rdd.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2}) """ 結果:[[('A', 1), ('B', 2), ('B', 4)]] """ print "sampleByKey_rdd = {0}".format(sampleByKey_rdd.glom().collect())
抽樣的原理詳細可以參考:spark core RDD api。這些原理性的東西用文字不太好表述
四、pipe,表示在RDD執行流中的某一步執行其他的腳本,比如python或者shell腳本等
conf = SparkConf().setAppName("appName").setMaster("local") sc = SparkContext(conf=conf) parallelize_rdd = sc.parallelize(["test1", "test2", "test3", "test4", "test5"], 2) """ //如果是在真實的spark集群中,那么要求echo.py在集群的每一臺機器的同一個目錄下面都要有 //第二個參數是環境變量 """ pipe_rdd = parallelize_rdd.pipe("python /Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py", {"env":"env"}) """ 結果:slave1-test1-env slave1-test2-env slave1-test3-env slave1-test4-env slave1-test5-env """ print "pipe_rdd = {0}".format(" ".join(pipe_rdd.collect()))
echo.py的內容如下:
import sys import os #input = "test" input = sys.stdin env_keys = os.environ.keys() env = "" if "env" in env_keys: env = os.environ["env"] for ele in input: output = "slave1-" + ele.strip('\n') + "-" + env print (output) input.close
對于pipe的原理,以及怎么實現的,參考:spark core RDD api,這個里面還清楚的講述了怎么消除手工將腳本拷貝到每一臺機器中的工作
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。