91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark delta如何讀數據

發布時間:2021-12-16 16:14:02 來源:億速云 閱讀:141 作者:小新 欄目:大數據

小編給大家分享一下spark delta如何讀數據,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

分析

spark 的delta datasource的構建要從DataSource.lookupDataSourceV2開始,之后會流向到loadV1Source,這里會進行dataSource.createRelation進行構建datasource的Relation的構建,直接轉到deltaDataSource 的createRelation:

override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val maybePath = parameters.getOrElse("path", {
      throw DeltaErrors.pathNotSpecifiedException
    })

    // Log any invalid options that are being passed in
    DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters))

    val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)
    DeltaTableV2(
      sqlContext.sparkSession,
      new Path(maybePath),
      timeTravelOpt = timeTravelByParams).toBaseRelation
  }
  1. DeltaOptions.verifyOptions進行參數校驗,有效的參數如下:

val validOptionKeys : Set[String] = Set(
    REPLACE_WHERE_OPTION,
    MERGE_SCHEMA_OPTION,
    EXCLUDE_REGEX_OPTION,
    OVERWRITE_SCHEMA_OPTION,
    USER_METADATA_OPTION,
    MAX_FILES_PER_TRIGGER_OPTION,
    IGNORE_FILE_DELETION_OPTION,
    IGNORE_CHANGES_OPTION,
    IGNORE_DELETES_OPTION,
    OPTIMIZE_WRITE_OPTION,
    DATA_CHANGE_OPTION,
    "queryName",
    "checkpointLocation",
    "path",
    "timestampAsOf",
    "versionAsOf"
  )
  1. DeltaDataSource.getTimeTravelVersion根據指定的timestampAsOf或者versionAsOf獲取指定的版本

  2. 直接調用DeltaTableV2的toBaseRelation方法:

def toBaseRelation: BaseRelation = {
    if (deltaLog.snapshot.version == -1) {
      val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier)))
        .getOrElse(DeltaTableIdentifier(path = Some(path.toString)))
      throw DeltaErrors.notADeltaTableException(id)
    }
    val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(
      path.toString, deltaLog.snapshot, partitionFilters)

    // TODO(burak): We should pass in the snapshot here
    deltaLog.createRelation(partitionPredicates, timeTravelSpec)
  }
  • 如果存在分區,則DeltaDataSource.verifyAndCreatePartitionFilter創建partitionPredicates

  • timeTravelSpec,這里優先選擇用戶指定的timeTravelByParams,否則通過DeltaDataSource.parsePathIdentifier選擇path指定的version,格式如:/some/path/partition=1@v1234 或者/some/path/partition=1@yyyyMMddHHmmssSSS

  • 直接調用deltaLog.createRelation:

    def createRelation(
       partitionFilters: Seq[Expression] = Nil,
       timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation = {
    
     val versionToUse = timeTravel.map { tt =>
       val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion(
         spark.sessionState.conf, this, tt)
       val source = tt.creationSource.getOrElse("unknown")
       recordDeltaEvent(this, s"delta.timeTravel.$source", data = Map(
         "tableVersion" -> snapshot.version,
         "queriedVersion" -> version,
         "accessType" -> accessType
       ))
       version
     }
    
     /** Used to link the files present in the table into the query planner. */
     val snapshotToUse = versionToUse.map(getSnapshotAt(_)).getOrElse(snapshot)
     val fileIndex = TahoeLogFileIndex(
       spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse)
    
     new HadoopFsRelation(
       fileIndex,
       partitionSchema = snapshotToUse.metadata.partitionSchema,
       dataSchema = snapshotToUse.metadata.schema,
       bucketSpec = None,
       snapshotToUse.fileFormat,
       snapshotToUse.metadata.format.options)(spark) with InsertableRelation {
       def insert(data: DataFrame, overwrite: Boolean): Unit = {
         val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
         WriteIntoDelta(
           deltaLog = DeltaLog.this,
           mode = mode,
           new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),
           partitionColumns = Seq.empty,
           configuration = Map.empty,
           data = data).run(spark)
       }
     }

     

    override def inputFiles: Array[String] = {
    getSnapshot(stalenessAcceptable = false).filesForScan(
      projection = Nil, partitionFilters).files.map(f => absolutePath(f.path).toString).toArray
    }


    該方法調用了snapshot的filesForScan方法:

    def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = {
    implicit val enc = SingleAction.addFileEncoder
    
    val partitionFilters = filters.flatMap { filter =>
      DeltaTableUtils.splitMetadataAndDataPredicates(filter, metadata.partitionColumns, spark)._1
    }
    
    val files = DeltaLog.filterFileList(
      metadata.partitionSchema,
      allFiles.toDF(),
      partitionFilters).as[AddFile].collect()
    
    DeltaScan(version = version, files, null, null, null)(null, null, null, null)
    }


    • . 通過指定版本獲取對應的snapshot

    • . 構建TahoeLogFileIndex,因為這里構建的是HadoopFsRelation,所以我們關注TahoeLogFileIndex的inputfiles方法:

通過之前文章的分析,我們直到deltalog記錄了AddFile和Remove記錄,那現在讀數據怎么讀取呢?全部在allFiles方法。
重點看一下:allFiles方法:

def allFiles: Dataset[AddFile] = {
 val implicits = spark.implicits
 import implicits._
 state.where("add IS NOT NULL").select($"add".as[AddFile])
 }

這里調用了state方法,而它又調用了stateReconstruction方法,

private lazy val cachedState =
 cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath")

 /** The current set of actions in this [[Snapshot]]. */
 def state: Dataset[SingleAction] = cachedState.getDS

stateReconstruction方法在checkpoint的時用到了,在這里也用到了,主要是重新構造文件狀態,合并AddFile和RemoveFile:

private def stateReconstruction: Dataset[SingleAction] = {
 ...
 loadActions.mapPartitions { actions =>
     val hdpConf = hadoopConf.value.value
     actions.flatMap {
       _.unwrap match {
         case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
         case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
         case other if other == null => None
         case other => Some(other.wrap)
       }
     }
    }
   ...
   .mapPartitions { iter =>
     val state = new InMemoryLogReplay(time)
     state.append(0, iter.map(_.unwrap))
     state.checkpoint.map(_.wrap)
   }
  }

而關鍵在于InMemoryLogReplay的append方法和checkpoint方法,這里做到了文件狀態的合并:

  assert(currentVersion == -1 || version == currentVersion + 1,
   s"Attempted to replay version $version, but state is at $currentVersion")
 currentVersion = version
 actions.foreach {
   case a: SetTransaction =>
     transactions(a.appId) = a
   case a: Metadata =>
     currentMetaData = a
   case a: Protocol =>
     currentProtocolVersion = a
   case add: AddFile =>
     activeFiles(add.pathAsUri) = add.copy(dataChange = false)
     // Remove the tombstone to make sure we only output one `FileAction`.
     tombstones.remove(add.pathAsUri)
   case remove: RemoveFile =>
     activeFiles.remove(remove.pathAsUri)
     tombstones(remove.pathAsUri) = remove.copy(dataChange = false)
   case ci: CommitInfo => // do nothing
   case null => // Some crazy future feature. Ignore
  }
 }

重點就在case add: AddFile和 case remove: RemoveFile處理以及checkpoint方法,能夠很好的合并文件狀態。

再調用collect方法,返回DeltaScan,之后獲取文件路徑作為要處理的文件路徑。

  • 把TahoeLogFileIndex傳入HadoopFsRelation得到最后的BaseRelation 返回

注意:spark讀取delta格式整個流程和spark讀取其他數據格式流程一致,主要區別在于讀取數據之前,會把文件狀態在內存中進行一次合并,這樣只需要讀取文件狀態為Addfile的就行了

以上是“spark delta如何讀數據”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武隆县| 武川县| 淳化县| 长丰县| 济源市| 滕州市| 昆山市| 洮南市| 广安市| 上栗县| 翁牛特旗| 铁力市| 同仁县| 福泉市| 新泰市| 乌兰察布市| 醴陵市| 什邡市| 元朗区| 五家渠市| 祁门县| 东山县| 蒙山县| 安阳市| 武威市| 从江县| 城口县| 青浦区| 裕民县| 榕江县| 青阳县| 遵化市| 翼城县| 芮城县| 习水县| 沾化县| 东乌| 赣州市| 耿马| 庆云县| 乐都县|