91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行數據湖deltalake中的時間旅行及版本管理

發布時間:2021-12-23 16:54:40 來源:億速云 閱讀:147 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關如何進行數據湖deltalake中的時間旅行及版本管理,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

deltalake支持數據版本管理和時間旅行:提供了數據快照,使開發人員能夠訪問和還原早期版本的數據以進行審核、回滾或重新計算。

1.場景

delta lake的時間旅行,實際上就是利用多版本管理機制,查詢歷史的delta 表快照。時間旅行有以下使用案例:

1).可以重復創建數據分析,報告或者一些輸出(比如,機器學習模型)。這主要是有利于調試和安全審查,尤其是在受管制的行業里。

2).編寫復雜的基于時間的查詢。

3).修正數據中的錯誤信息。

4).為一組查詢提供快照隔離,以快速變更表。

2.配置

DataframeTable支持創建dataframe的時候指定一個delta lake表的版本信息:

val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

對于版本號,直接傳入一個版本數值即可,如下:

val df2 = spark.read.format("delta").option("versionAsOf", 0).table(tableName)

對于timestamp字符串,必須要是date格式或者timestamp格式。例如:

val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28").load("/delta/events")val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28T00:00:00.000Z").load("/delta/events")

由于delta lake的表是存在更新的情況,所以多次讀取數據生成的dataframe之間會有差異,因為兩次讀取數據可能是一次是數據更新前,另一次是數據更新后。使用時間旅行你就可以在多次調用之間修復數據。

val latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()val df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")

3.數據保存時間

默認情況下,deltalake保存最近30天的提交歷史。這就意味著可以指定30天之前的版本來讀取數據,但是有些注意事項:

3.1 沒對delta 表調用VACUUM函數。VACUUM函數是用來刪除不在引用的delta表和一些超過保留時間的表,支持sql和API形式。

slq表達式:

VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

?

scala API 表達式

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

可以通過下面兩個delta 表屬性配置來

  • delta.logRetentionDuration =“ interval <interval>”:控制將表的歷史記錄保留多長時間。每次寫入checkpoint時,都會自動清除早于保留間隔的日志。如果將此配置設置為足夠大的值,則會保留許多日志。這不會影響性能,因為針對日志的操作是常量時間。歷史記錄的操作是并行的(但是隨著日志大小的增加,它將變得更加耗時)。默認值為 interval 30 days。

  • delta.deletedFileRetentionDuration =“ interval <interval>”:在這個時間范圍內的數據是不會被VACUUM命令刪除。默認值為間隔7天。要訪問30天的歷史數據,請設置delta.deletedFileRetentionDuration = "interval 30 days"。此設置可能會導致您的存儲成本上升。

注意:VACUUM命令是不會刪除日志文件的,日志文件是在checkpoint之后自動刪除的。

為了讀取之前版本的數據,必須要保留該版本的日志文件和數據文件。

4.案例

修復意外刪除的用戶111的數據。

INSERT INTO my_table  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)  WHERE userId = 111

修復錯誤更新的數據

MERGE INTO my_table target  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source  ON source.userId = target.userId  WHEN MATCHED THEN UPDATE SET *

查詢過去七天新增的消費者數:

  SELECT count(distinct userId)  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

以上就是如何進行數據湖deltalake中的時間旅行及版本管理,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阜新| 庆安县| 琼结县| 吉安县| 桂平市| 商丘市| 兰考县| 木里| 鹤山市| 阿鲁科尔沁旗| 江西省| 福泉市| 北宁市| 南川市| 汾阳市| 兰西县| 凤凰县| 波密县| 宜兰市| 大安市| 通渭县| 江源县| 吕梁市| 昌平区| 浮山县| 白银市| 外汇| 巴中市| 翁牛特旗| 江都市| 吴江市| 杭锦后旗| 湟中县| 屏山县| 项城市| 三明市| 临邑县| 白沙| 邵武市| 宕昌县| 丰县|