您好,登錄后才能下訂單哦!
小編給大家分享一下spark delta如何讀數據,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
spark 的delta datasource的構建要從DataSource.lookupDataSourceV2開始,之后會流向到loadV1Source,這里會進行dataSource.createRelation進行構建datasource的Relation的構建,直接轉到deltaDataSource 的createRelation:
override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val maybePath = parameters.getOrElse("path", { throw DeltaErrors.pathNotSpecifiedException }) // Log any invalid options that are being passed in DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters)) val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters) DeltaTableV2( sqlContext.sparkSession, new Path(maybePath), timeTravelOpt = timeTravelByParams).toBaseRelation }
DeltaOptions.verifyOptions進行參數校驗,有效的參數如下:
val validOptionKeys : Set[String] = Set( REPLACE_WHERE_OPTION, MERGE_SCHEMA_OPTION, EXCLUDE_REGEX_OPTION, OVERWRITE_SCHEMA_OPTION, USER_METADATA_OPTION, MAX_FILES_PER_TRIGGER_OPTION, IGNORE_FILE_DELETION_OPTION, IGNORE_CHANGES_OPTION, IGNORE_DELETES_OPTION, OPTIMIZE_WRITE_OPTION, DATA_CHANGE_OPTION, "queryName", "checkpointLocation", "path", "timestampAsOf", "versionAsOf" )
DeltaDataSource.getTimeTravelVersion根據指定的timestampAsOf或者versionAsOf獲取指定的版本
直接調用DeltaTableV2的toBaseRelation方法:
def toBaseRelation: BaseRelation = { if (deltaLog.snapshot.version == -1) { val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier))) .getOrElse(DeltaTableIdentifier(path = Some(path.toString))) throw DeltaErrors.notADeltaTableException(id) } val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters( path.toString, deltaLog.snapshot, partitionFilters) // TODO(burak): We should pass in the snapshot here deltaLog.createRelation(partitionPredicates, timeTravelSpec) }
如果存在分區,則DeltaDataSource.verifyAndCreatePartitionFilter創建partitionPredicates
timeTravelSpec,這里優先選擇用戶指定的timeTravelByParams,否則通過DeltaDataSource.parsePathIdentifier選擇path指定的version,格式如:/some/path/partition=1@v1234 或者/some/path/partition=1@yyyyMMddHHmmssSSS
直接調用deltaLog.createRelation:
def createRelation( partitionFilters: Seq[Expression] = Nil, timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation = { val versionToUse = timeTravel.map { tt => val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion( spark.sessionState.conf, this, tt) val source = tt.creationSource.getOrElse("unknown") recordDeltaEvent(this, s"delta.timeTravel.$source", data = Map( "tableVersion" -> snapshot.version, "queriedVersion" -> version, "accessType" -> accessType )) version } /** Used to link the files present in the table into the query planner. */ val snapshotToUse = versionToUse.map(getSnapshotAt(_)).getOrElse(snapshot) val fileIndex = TahoeLogFileIndex( spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse) new HadoopFsRelation( fileIndex, partitionSchema = snapshotToUse.metadata.partitionSchema, dataSchema = snapshotToUse.metadata.schema, bucketSpec = None, snapshotToUse.fileFormat, snapshotToUse.metadata.format.options)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append WriteIntoDelta( deltaLog = DeltaLog.this, mode = mode, new DeltaOptions(Map.empty[String, String], spark.sessionState.conf), partitionColumns = Seq.empty, configuration = Map.empty, data = data).run(spark) } }
override def inputFiles: Array[String] = { getSnapshot(stalenessAcceptable = false).filesForScan( projection = Nil, partitionFilters).files.map(f => absolutePath(f.path).toString).toArray }
該方法調用了snapshot的filesForScan方法:
def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = { implicit val enc = SingleAction.addFileEncoder val partitionFilters = filters.flatMap { filter => DeltaTableUtils.splitMetadataAndDataPredicates(filter, metadata.partitionColumns, spark)._1 } val files = DeltaLog.filterFileList( metadata.partitionSchema, allFiles.toDF(), partitionFilters).as[AddFile].collect() DeltaScan(version = version, files, null, null, null)(null, null, null, null) }
. 通過指定版本獲取對應的snapshot
. 構建TahoeLogFileIndex,因為這里構建的是HadoopFsRelation,所以我們關注TahoeLogFileIndex的inputfiles方法:
通過之前文章的分析,我們直到deltalog記錄了AddFile和Remove記錄,那現在讀數據怎么讀取呢?全部在allFiles方法。
重點看一下:allFiles方法:
def allFiles: Dataset[AddFile] = { val implicits = spark.implicits import implicits._ state.where("add IS NOT NULL").select($"add".as[AddFile]) }
這里調用了state方法,而它又調用了stateReconstruction方法,
private lazy val cachedState = cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath") /** The current set of actions in this [[Snapshot]]. */ def state: Dataset[SingleAction] = cachedState.getDS
stateReconstruction方法在checkpoint的時用到了,在這里也用到了,主要是重新構造文件狀態,合并AddFile和RemoveFile:
private def stateReconstruction: Dataset[SingleAction] = { ... loadActions.mapPartitions { actions => val hdpConf = hadoopConf.value.value actions.flatMap { _.unwrap match { case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap) case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap) case other if other == null => None case other => Some(other.wrap) } } } ... .mapPartitions { iter => val state = new InMemoryLogReplay(time) state.append(0, iter.map(_.unwrap)) state.checkpoint.map(_.wrap) } }
而關鍵在于InMemoryLogReplay的append方法和checkpoint方法,這里做到了文件狀態的合并:
assert(currentVersion == -1 || version == currentVersion + 1, s"Attempted to replay version $version, but state is at $currentVersion") currentVersion = version actions.foreach { case a: SetTransaction => transactions(a.appId) = a case a: Metadata => currentMetaData = a case a: Protocol => currentProtocolVersion = a case add: AddFile => activeFiles(add.pathAsUri) = add.copy(dataChange = false) // Remove the tombstone to make sure we only output one `FileAction`. tombstones.remove(add.pathAsUri) case remove: RemoveFile => activeFiles.remove(remove.pathAsUri) tombstones(remove.pathAsUri) = remove.copy(dataChange = false) case ci: CommitInfo => // do nothing case null => // Some crazy future feature. Ignore } }
重點就在case add: AddFile和 case remove: RemoveFile處理以及checkpoint方法,能夠很好的合并文件狀態。
再調用collect方法,返回DeltaScan,之后獲取文件路徑作為要處理的文件路徑。
把TahoeLogFileIndex傳入HadoopFsRelation得到最后的BaseRelation 返回
注意:spark讀取delta格式整個流程和spark讀取其他數據格式流程一致,主要區別在于讀取數據之前,會把文件狀態在內存中進行一次合并,這樣只需要讀取文件狀態為Addfile的就行了
以上是“spark delta如何讀數據”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。