您好,登錄后才能下訂單哦!
本篇內容主要講解“spark的動態分區裁剪怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spark的動態分區裁剪怎么實現”吧!
本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動態分區裁剪
直接定位到PartitionPruning.applyPartitionPruning是邏輯計劃的規則
override def apply(plan: LogicalPlan): LogicalPlan = plan match { // Do not rewrite subqueries. case s: Subquery if s.correlated => plan case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan case _ => prune(plan) }
當是該邏輯計劃是子查詢且該子查詢是相關的,則直接跳過,因為相關的子查詢將會被重寫到join條件中
如果沒有開啟動態分區,則直接跳過
其他條件則會跳到下一步 下一步的條件,則是會判斷是否是包含join操作,如果是join操作才會進行后續的操作:
private def prune(plan: LogicalPlan): LogicalPlan = { plan transformUp { // skip this rule if there's already a DPP subquery on the LHS of a join case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j case j @ Join(left, right, joinType, Some(condition), hint) =>
具體分析一下每一步: 1.
var newLeft = left var newRight = right // extract the left and right keys of the join condition val (leftKeys, rightKeys) = j match { case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _) => (lkeys, rkeys) case _ => (Nil, Nil) } //ExtractEquiJoinKeys的unapply方法 def unapply(join: Join): Option[ReturnType] = join match { case Join(left, right, joinType, condition, hint) => logDebug(s"Considering join on: $condition") // Find equi-join predicates that can be evaluated before the join, and thus can be used // as join keys. val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil) val joinKeys = predicates.flatMap { case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r)) case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l)) // Replace null with default value for joining key, then those rows with null in it could // be joined together case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Seq((Coalesce(Seq(l, Literal.default(l.dataType))), Coalesce(Seq(r, Literal.default(r.dataType)))), (IsNull(l), IsNull(r)) ) case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Seq((Coalesce(Seq(r, Literal.default(r.dataType))), Coalesce(Seq(l, Literal.default(l.dataType)))), (IsNull(r), IsNull(l)) ) case other => None }
ExtractEquiJoinKeys用來提取and條件分隔的多個條件,之后只有條件滿足相等的才能進行下一步處理:
如果相等但是左邊或者右邊的表達式的為空,則不匹配
如果相等而且有對應的邏輯計劃能夠產生對應的屬性值,則匹配
如果是EqualNullsafe,且有相應的邏輯能夠產生相應的屬性值,則會轉換為Coalesce和isnull的判斷
之后轉化為leftKeys和rightKeys表達式 如join的條件是:tableA.a1 = tableB.b2 AND tableA.a2=tableB.b2
則經過該過程得到的結果為leftKey為:Seq(tableA.a1,tableA.a2) rightKeys為:Seq(tableB.b1,tableB.b2)
splitConjunctivePredicates(condition).foreach { case EqualTo(a: Expression, b: Expression) if fromDifferentSides(a, b) => val (l, r) = if (a.references.subsetOf(left.outputSet) && b.references.subsetOf(right.outputSet)) { a -> b } else { b -> a } // there should be a partitioned table and a filter on the dimension table, // otherwise the pruning will not trigger var partScan = getPartitionTableScan(l, left) if (partScan.isDefined && canPruneLeft(joinType) && hasPartitionPruningFilter(right)) { val hasBenefit = pruningHasBenefit(l, partScan.get, r, right) newLeft = insertPredicate(l, newLeft, r, right, rightKeys, hasBenefit) } else { partScan = getPartitionTableScan(r, right) if (partScan.isDefined && canPruneRight(joinType) && hasPartitionPruningFilter(left) ) { val hasBenefit = pruningHasBenefit(r, partScan.get, l, left) newRight = insertPredicate(r, newRight, l, left, leftKeys, hasBenefit) } } case _ => }
對每一個Equals對,先對左邊表達式進行getPartitionTableScan 操作,該方法的作用是:
找到該表達式的最終邏輯計劃,并且返回
只有該邏輯計劃是HadoopFsRelation類型且存在partition列的時候
,才返回該邏輯計劃
如果join左邊邏輯計劃滿足getPartitionTableScan,且join的類型是innerjoin/leftSemi/RightOuter,且該join右邊邏輯計劃不是一個流且存在比如> <這種的filter, 才會在左邊邏輯計劃插入一個DynamicPruningSubquery的父節點,但是插入該節點還有兩個條件是pruningHasBenefit或者SQLConf.get.exchangeReuseEnabled 滿足,默認SQLConf.get.exchangeReuseEnabled是ture
對于右邊的邏輯計劃也是類似的處理方式。只不過join的類型要求為inner/LeftOuter
pruningHasBenefit方法的計算邏輯為: 如果filterRatio*getPartitionTableScan.stats.sizeInByte>該邏輯計劃涉及的所有葉子節點.stats.sizeInByte 則可以添加DynamicPruningSubquery
返回整個新的join操作
Join(newLeft, newRight, joinType, Some(condition), hint
到此,相信大家對“spark的動態分區裁剪怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。