您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何進行數據湖deltalake中的時間旅行及版本管理,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
deltalake支持數據版本管理和時間旅行:提供了數據快照,使開發人員能夠訪問和還原早期版本的數據以進行審核、回滾或重新計算。
1.場景
delta lake的時間旅行,實際上就是利用多版本管理機制,查詢歷史的delta 表快照。時間旅行有以下使用案例:
1).可以重復創建數據分析,報告或者一些輸出(比如,機器學習模型)。這主要是有利于調試和安全審查,尤其是在受管制的行業里。
2).編寫復雜的基于時間的查詢。
3).修正數據中的錯誤信息。
4).為一組查詢提供快照隔離,以快速變更表。
2.配置
DataframeTable支持創建dataframe的時候指定一個delta lake表的版本信息:
val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
對于版本號,直接傳入一個版本數值即可,如下:
val df2 = spark.read.format("delta").option("versionAsOf", 0).table(tableName)
對于timestamp字符串,必須要是date格式或者timestamp格式。例如:
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28").load("/delta/events")
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28T00:00:00.000Z").load("/delta/events")
由于delta lake的表是存在更新的情況,所以多次讀取數據生成的dataframe之間會有差異,因為兩次讀取數據可能是一次是數據更新前,另一次是數據更新后。使用時間旅行你就可以在多次調用之間修復數據。
val latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()val df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")
3.數據保存時間
默認情況下,deltalake保存最近30天的提交歷史。這就意味著可以指定30天之前的版本來讀取數據,但是有些注意事項:
3.1 沒對delta 表調用VACUUM函數。VACUUM函數是用來刪除不在引用的delta表和一些超過保留時間的表,支持sql和API形式。
slq表達式:
VACUUM eventsTable -- vacuum files not required by versions older than the default retention period
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
?
scala API 表達式
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
可以通過下面兩個delta 表屬性配置來
delta.logRetentionDuration =“ interval <interval>”:控制將表的歷史記錄保留多長時間。每次寫入checkpoint時,都會自動清除早于保留間隔的日志。如果將此配置設置為足夠大的值,則會保留許多日志。這不會影響性能,因為針對日志的操作是常量時間。歷史記錄的操作是并行的(但是隨著日志大小的增加,它將變得更加耗時)。默認值為 interval 30 days。
delta.deletedFileRetentionDuration =“ interval <interval>”:在這個時間范圍內的數據是不會被VACUUM命令刪除。默認值為間隔7天。要訪問30天的歷史數據,請設置delta.deletedFileRetentionDuration = "interval 30 days"。此設置可能會導致您的存儲成本上升。
注意:VACUUM命令是不會刪除日志文件的,日志文件是在checkpoint之后自動刪除的。
為了讀取之前版本的數據,必須要保留該版本的日志文件和數據文件。
4.案例
修復意外刪除的用戶111的數據。
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修復錯誤更新的數據
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查詢過去七天新增的消費者數:
SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
以上就是如何進行數據湖deltalake中的時間旅行及版本管理,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。