您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spark數據集的過濾方法”,在日常操作中,相信很多人在Spark數據集的過濾方法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark數據集的過濾方法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在實際工作中,根據某個字段,對一個Spark數據集進行過濾,是一個很常見的場景,舉個例子:
一個存儲公司員工信息的數據集A,有以下三個字段:
id: Integer name: String age: Integer
現在要過濾出某些員工的id,這些id在B集合(B可能是哈希表,也可能是Spark數據集)中,過濾邏輯為:
C = A.filter(A.id in B)
有四種方法可以實現,分別為:
Filter
Map
MapPartition
Inner Join
下面是詳細介紹。
Spark的Filter變換,可以根據條件表達式、返回布爾值的過濾函數、條件字符串,對數據集進行過濾,使用方法如下:
// 1. 條件表達式A1 = A.filter(Column condition)// 2. 自定義過濾函數A1 = A.filter(FilterFunction<T> func)// 3. 條件字符串A1 = A.filter(String condition)
Filter 變換比較簡單,逐條處理記錄不論數據集大小,效率都很高,但需要能夠將用來過濾的數據集B廣播到所有的executor上。
Map變換,對數據集中每條記錄調用一個函數,返回值可以是null,也可以是相同類型或不同類型的新記錄,使用方法如下:
// encoder參數用來指定輸出類型A2 = A.map(MapFunction<T,U> func, Encoder<U> encoder)
通過Map變換實現過濾的話,只需要在Map變換中,將符合條件的記錄原樣返回,不符合條件的記錄返回null即可。
可以看到,Map變換的語義和Filter變換的語義相似,都是逐條處理記錄,但Map需要提供一個額外的Encoder,故沒有Filter簡單和優雅,且因為輸出要過濾null值,所以效率不如Filter。
MapPartitions變換,與Map變換類似,但映射函數不是在每條記錄上調用,而是在分區級別調用,使用方法如下:
// func的輸入和輸出都是Iterator類型A3 = A.map(MapPartitionsFunction<T,U> func, Encoder<U> encoder)
MapPartitions在分區級別進行操作,而不是記錄級別,因此比Filter和Map效率更高。缺點的話,首先和Map一樣,需要提供一個額外的Encoder,此外,當分區過大,超過executor所能提供的內存時,任務會失敗,因此可靠性不如Map和Filter。
以員工id相等為Inner Join的條件,然后只要A集合中的字段,同樣可以實現過濾,使用方法:
// join表達式可能為 A("id") === B("id")A4 = A.join(Dataset<?> B, Column joinExprs)
Inner Join和Filter一樣,效率和可靠性都有保證,且對B集合的類型和大小都沒有偏好。
到此,關于“Spark數據集的過濾方法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。