91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RDD血緣關系源碼詳解!

發布時間:2020-07-29 18:28:40 來源:網絡 閱讀:830 作者:Stitch_x 欄目:大數據
一、RDD的依賴關系

RDD的依賴關系分為兩類:寬依賴和窄依賴。我們可以這樣認為:

  • (1)窄依賴:每個parent RDD 的 partition 最多被 child RDD 的一個partition 使用。
  • (2)寬依賴:每個parent RDD partition 被多個 child RDD 的partition 使用。

窄依賴每個 child RDD 的 partition 的生成操作都是可以并行的,而寬依賴則需要所有的 parent RDD partition shuffle 結果得到后再進行。

二、org.apache.spark.Dependency.scala 源碼解析

Dependency是一個抽象類:

// Denpendency.scala
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

它有兩個子類:NarrowDependency 和 ShuffleDenpendency,分別對應窄依賴和寬依賴。

(1)NarrowDependency也是一個抽象類

定義了抽象方法getParents,輸入partitionId,用于獲得child RDD 的某個partition依賴的parent RDD的所有 partitions。

// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {  
/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

窄依賴又有兩個具體的實現:OneToOneDependency和RangeDependency。
(a)OneToOneDependency指child RDD的partition只依賴于parent RDD 的一個partition,產生OneToOneDependency的算子有map,filter,flatMap等。可以看到getParents實現很簡單,就是傳進去一個partitionId,再把partitionId放在List里面傳出去。

// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
        (b)RangeDependency指child RDD partition在一定的范圍內一對一的依賴于parent RDD partition,主要用于union。

// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)  
  extends NarrowDependency[T](rdd) {//inStart表示parent RDD的開始索引,outStart表示child RDD 的開始索引
  override def getParents(partitionId: Int): List[Int] = {    
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)//表示于當前索引的相對位置
    } else {
      Nil
    }
  }
}
(2)ShuffleDependency指寬依賴

表示一個parent RDD的partition會被child RDD的partition使用多次。需要經過shuffle才能形成。

// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],    
    val partitioner: Partitioner,    
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {  //shuffle都是基于PairRDD進行的,所以傳入的RDD要是key-value類型的
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)  //獲取shuffleId
  val shuffleId: Int = _rdd.context.newShuffleId()  //向shuffleManager注冊shuffle信息
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于shuffle涉及到網絡傳輸,所以要有序列化serializer,為了減少網絡傳輸,可以map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關的keyOrdering,以及重輸出的數據如何分區的partitioner,還有一些class信息。Partition之間的關系在shuffle處戛然而止,因此shuffle是劃分stage的依據。

三、兩種依賴的區分

首先,窄依賴允許在一個集群節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然后filter操作;而寬依賴則需要首先計算好所有父分區數據,然后在節點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分區的父分區,而且不同節點之間可以并行計算;而對于一個寬依賴關系的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分區,因而需要整體重新計算。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

崇左市| 衡南县| 锡林郭勒盟| 嘉禾县| 白玉县| 武鸣县| 雷波县| 永丰县| 关岭| 靖远县| 韩城市| 岱山县| 措勤县| 海伦市| 安塞县| 浑源县| 黄梅县| 理塘县| 教育| 宝兴县| 阿克苏市| 崇明县| 东台市| 嘉定区| 大田县| 宁波市| 临安市| 广平县| 博乐市| 白银市| 岑巩县| 宁明县| 桐城市| 台中县| 孝感市| 张北县| 务川| 兴国县| 高台县| 平和县| 独山县|