91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行數據湖deltalake流表的讀寫

發布時間:2021-12-23 16:47:38 來源:億速云 閱讀:129 作者:柒染 欄目:大數據

這篇文章給大家介紹如何進行數據湖deltalake流表的讀寫,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常見的與流系統和文件整合帶來的相關限制,如下:

  • 保證了多個流(或并發批處理作業)的僅一次處理。

  • 當使用文件作為流源時,可以有效地發現哪些文件是新文件。

1. 作為stream source

1.1 案例講解

當你的structured streaming使用delta lake作為stream source的時候,應用會處理delta 表中已有的數據,以及delta 表新增的數據。

spark.readStream.format("delta").load("/delta/events")

也可以做一些優化,如下:

a.通過maxFilesPerTrigger配置控制structured streaming從delta lake加載的微批文件數。要知道Structured streaming也是微批的概念。該參數就是控制每次trigger計算的最大新增文件數,默認是1000,實際情況要根據數據量和資源數量進行控制。

b.通過maxBytesPerTrigger控制每次trigger處理的最大數據量。這是設置一個“ soft max”,這意味著一個批處理大約可以處理此數量的數據,并且可能處理的數量超出這個限制。如果使用的是Trigger.Once,則 此配置無效。如果將此配置與maxFilesPerTrigger結合使用,兩個參數任意一個達到臨屆條件,都會生效。

1.2 忽略更新和刪除

structured streaming不處理不是追加的輸入數據,并且如果對作為source的delta table的表進行了任何修改,則structured streaming會拋出異常。 對于變更常見的企業場景,提供了兩種策略,來處理對delta 表變更給structured streaming 任務造成的影響:

  • 可以刪除輸出和checkpoint,并重新啟動structured streaming對數據計算,也即是重新計算一次。

  • 可以設置以下兩個選項之一:

    • ignoreDeletes:忽略在分區表中刪除數據的事務。

    • ignoreChanges:如果由于諸如UPDATE,MERGE INTO,DELETE(在分區內)或OVERWRITE之類的數據更改操作而不得不在源表中重寫文件,則重新處理更新的文件。因此未更改的行仍可能會處理并向下游傳輸,因此structured streaming的下游應該能夠處理重復數據。刪除不會傳輸到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則流不會因源表的刪除或更新而中斷。

1.3 案例

假設有一張表叫做user_events,有三個字段:date,user_email,action,而且該表以date字段進行分區。structured streaming區處理這張表,且還有其程序會對該delta 表進行插入和刪除操作。

假設僅僅是刪除操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreDeletes", "true")  .load("/delta/user_events")

假設對delta表修改操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreChanges", "true")  .load("/delta/user_events")

如果使用UPDATE語句更新了user_email字段某個值,則包含相關user_email的文件將被重寫,這個是delta lake更改操作實現機制后面會講。使用ignoreChanges時,新記錄將與同一文件中的所有其他未更改記錄一起向下游傳輸。 所以下游程序應該能夠處理這些傳入的重復記錄。

2.delta 表作為sink

delta table可以作為Structured Streaming的sink使用。delta lake的事務日志確保了其能實現僅一次處理。

2.1 append mode

默認是append 模式,僅僅是追加數據到delta 表:

events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")  .start("/delta/events") // as a path

2.2 complete mode

也可以使用Structured Streaming每個批次覆蓋一次整張表。在某些聚合場景下會用到該模式:

  .format("delta")  .load("/delta/events")  .groupBy("customerId")  .count()  .writeStream  .format("delta")  .outputMode("complete")  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")  .start("/delta/eventsByCustomer")

對于延遲要求更寬松的應用程序,可以使用Trigger.Once來節省計算資源。once trigger每次處理從開始到最新的數據,典型的kappa模型,很適合這種場景了。

關于如何進行數據湖deltalake流表的讀寫就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

苗栗市| 双鸭山市| 雷山县| 鄂温| 铁岭市| 崇阳县| 永兴县| 称多县| 饶河县| 普兰县| 碌曲县| 共和县| 丽水市| 称多县| 青州市| 新竹市| 根河市| 望城县| 南皮县| 高密市| 本溪| 兴和县| 郎溪县| 鱼台县| 临潭县| 白沙| 齐齐哈尔市| 永修县| 磐石市| 黎城县| 宁武县| 库伦旗| 阿巴嘎旗| 成安县| 温宿县| 皋兰县| 嘉义县| 乌鲁木齐县| 张家口市| 五家渠市| 米泉市|