您好,登錄后才能下訂單哦!
這篇文章給大家介紹如何進行數據湖deltalake流表的讀寫,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常見的與流系統和文件整合帶來的相關限制,如下:
保證了多個流(或并發批處理作業)的僅一次處理。
當使用文件作為流源時,可以有效地發現哪些文件是新文件。
1. 作為stream source
1.1 案例講解
當你的structured streaming使用delta lake作為stream source的時候,應用會處理delta 表中已有的數據,以及delta 表新增的數據。
spark.readStream.format("delta").load("/delta/events")
也可以做一些優化,如下:
a.通過maxFilesPerTrigger配置控制structured streaming從delta lake加載的微批文件數。要知道Structured streaming也是微批的概念。該參數就是控制每次trigger計算的最大新增文件數,默認是1000,實際情況要根據數據量和資源數量進行控制。
b.通過maxBytesPerTrigger控制每次trigger處理的最大數據量。這是設置一個“ soft max”,這意味著一個批處理大約可以處理此數量的數據,并且可能處理的數量超出這個限制。如果使用的是Trigger.Once,則 此配置無效。如果將此配置與maxFilesPerTrigger結合使用,兩個參數任意一個達到臨屆條件,都會生效。
1.2 忽略更新和刪除
structured streaming不處理不是追加的輸入數據,并且如果對作為source的delta table的表進行了任何修改,則structured streaming會拋出異常。 對于變更常見的企業場景,提供了兩種策略,來處理對delta 表變更給structured streaming 任務造成的影響:
可以刪除輸出和checkpoint,并重新啟動structured streaming對數據計算,也即是重新計算一次。
可以設置以下兩個選項之一:
ignoreDeletes:忽略在分區表中刪除數據的事務。
ignoreChanges:如果由于諸如UPDATE,MERGE INTO,DELETE(在分區內)或OVERWRITE之類的數據更改操作而不得不在源表中重寫文件,則重新處理更新的文件。因此未更改的行仍可能會處理并向下游傳輸,因此structured streaming的下游應該能夠處理重復數據。刪除不會傳輸到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則流不會因源表的刪除或更新而中斷。
1.3 案例
假設有一張表叫做user_events,有三個字段:date,user_email,action,而且該表以date字段進行分區。structured streaming區處理這張表,且還有其程序會對該delta 表進行插入和刪除操作。
假設僅僅是刪除操作,可以這么配置stream:
events.readStream .format("delta") .option("ignoreDeletes", "true") .load("/delta/user_events")
假設對delta表修改操作,可以這么配置stream:
events.readStream .format("delta") .option("ignoreChanges", "true") .load("/delta/user_events")
如果使用UPDATE語句更新了user_email字段某個值,則包含相關user_email的文件將被重寫,這個是delta lake更改操作實現機制后面會講。使用ignoreChanges時,新記錄將與同一文件中的所有其他未更改記錄一起向下游傳輸。 所以下游程序應該能夠處理這些傳入的重復記錄。
2.delta 表作為sink
delta table可以作為Structured Streaming的sink使用。delta lake的事務日志確保了其能實現僅一次處理。
2.1 append mode
默認是append 模式,僅僅是追加數據到delta 表:
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path
2.2 complete mode
也可以使用Structured Streaming每個批次覆蓋一次整張表。在某些聚合場景下會用到該模式:
.format("delta") .load("/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer")
對于延遲要求更寬松的應用程序,可以使用Trigger.Once來節省計算資源。once trigger每次處理從開始到最新的數據,典型的kappa模型,很適合這種場景了。
關于如何進行數據湖deltalake流表的讀寫就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。