91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark的動態分區裁剪下物理計劃怎么實現

發布時間:2021-12-09 16:47:14 來源:億速云 閱讀:179 作者:iii 欄目:大數據

本篇內容介紹了“spark的動態分區裁剪下物理計劃怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

背景

本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了動態分區裁剪,在 spark 的動態分區裁剪上(Dynamic partition pruning)-邏輯計劃我們提到在邏輯計劃階段會加入DynamicPruningSubquery,今天我們分析一下在物理階段怎么對DynamicPruningSubquery進行優化以及實現的

分析

直接轉到PlanDynamicPruningFilters的apply方法:

override def apply(plan: SparkPlan): SparkPlan = {
    if (!SQLConf.get.dynamicPartitionPruningEnabled) {
      return plan
    }

    plan transformAllExpressions {
      case DynamicPruningSubquery(
          value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
        val sparkPlan = QueryExecution.createSparkPlan(
          sparkSession, sparkSession.sessionState.planner, buildPlan)
        // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
        // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
        val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.find {
            case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
              left.sameResult(sparkPlan)
            case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) =>
              right.sameResult(sparkPlan)
            case _ => false
          }.isDefined

        if (canReuseExchange) {
          val mode = broadcastMode(buildKeys, buildPlan)
          val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
          // plan a broadcast exchange of the build side of the join
          val exchange = BroadcastExchangeExec(mode, executedPlan)
          val name = s"dynamicpruning#${exprId.id}"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
        } else if (onlyInBroadcast) {
          // it is not worthwhile to execute the query, so we fall-back to a true literal
          DynamicPruningExpression(Literal.TrueLiteral)
        } else {
          // we need to apply an aggregate on the buildPlan in order to be column pruned
          val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
          val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
          DynamicPruningExpression(expressions.InSubquery(
            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
        }
    }
  }
  1. 如果沒有開啟動態分區裁剪,則直接跳過

  2. QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan) 通過邏輯計劃構造物理計劃

  3. 判斷是否reuseExchange,如果spark.sql.exchange.reuse配置為true,且存在join的是broadcastHashjoin,而且計算結果和要進行過濾的物理計劃的結果一樣,則進行下一步,

  • 進行物理計劃執行前的準備, 得到executedPlan

  • 構建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec內部就是進行spark的broadcast操作 注意:這里的BroadcastExchangeExec會在ReuseExchange規則中被優化, 最終會被BroadcastQueryStageExec調用,從而公用同一個broacast的值

  1. 如果以上不滿足,默認DynamicPruningExpression(Literal.TrueLiteral),也就是不會進行裁剪

  2. 如果不是broadcastHashjoin,但是能夠加速,則按照需要過濾的key做一次聚合,之后再組成DynamicPruningExpression

至此動態裁剪的物理計劃優化就分析完了

“spark的動態分區裁剪下物理計劃怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

郧西县| 将乐县| 六盘水市| 云浮市| 五家渠市| 合阳县| 萨迦县| 郴州市| 南溪县| 沂南县| 监利县| 盐山县| 泾川县| 泰安市| 临泽县| 资溪县| 府谷县| 临猗县| 陆丰市| 河北区| 尉氏县| 喜德县| 江城| 肥乡县| 顺昌县| 新平| 绥中县| 舟山市| 龙游县| 嘉荫县| 张家口市| 苍南县| 垣曲县| 西宁市| 连南| 兴海县| 江口县| 五大连池市| 锡林郭勒盟| 盖州市| 楚雄市|