您好,登錄后才能下訂單哦!
本篇內容介紹了“如何通過map操作看RDD的Map過程”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
RDD中的map,flatMap等操作是怎么串在一起形成DAG圖的呢?這是個很重要的問題,理解了這一點才能更好的理解Spark的內核實現。本文通過map過程來試圖解釋這一點。
先看看RDD的一個子類:MapPartitionsRDD,它會用在map函數場景下。
它的定義:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, isOrderSensitive: Boolean = false) extends RDD[U](prev)
prev是父RDD,就是父類RDD的入參,在后面的代碼里就是firstParent。
F代表了map函數的定義,其中第二個Int參數是分區索引號。我們先不管這個f入參怎么傳進來的,先看看MapPartitionsRDD需要做哪些事。
前面說過,對于RDD來說,最重要的函數就是compute,MapPartitionsRDD的compute方法定義:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
很明確,就是用當前的solit分區來執行入參的f函數!
那么,這個MapPartitionsRDD是怎么產生的呢?原來是在RDD類中的map函數產生的:
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
這幾行代碼什么意思?這里還是需要好好分析一下的。
對照MapPartitionsRDD的定義,我們知道:
(_, _, iter) => iter.map(cleanF)
里面的_,_代表TaskContext和分區索引,因為在MapPartitionsRDD的compute方法中已經有了split入參和context入參,所以在RDD中就不需要傳這兩個參數了。
iter代表要處理的數據集,在MapPartitionsRDD中的compute方法中定義為:
firstParent[T].iterator(split, context)
函數就是第一個父類RDD的split分區的數據集。這里就很清楚了,對這個數據集做cleanF操作(也就是sc.clean之后的map函數,sc.clean是去掉不能序列號的字節碼的意思,保證可以序列化后分發到其他節點執行)。
“如何通過map操作看RDD的Map過程”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。