91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行delta lake 的curd操作

發布時間:2021-12-23 16:48:25 來源:億速云 閱讀:162 作者:柒染 欄目:大數據

這篇文章給大家介紹如何進行delta lake 的curd操作,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

delta lake 的表支持刪除和更新數據的語法,下面主要是從sql和scala兩個語法說起吧。

1. 刪除delta 表數據

可以根據查詢條件,從delta表中刪除數據,比如刪除日期在2017年之前的數據,sql和scala的表達語法如下。

sql

DELETE FROM events WHERE date < '2017-01-01'
DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

scala

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

請注意,delete操作會將數據從delta 表的最新版本中刪除,但其實只有到歷史版本直接被vacuum清空的時候,才會從物理存儲中刪除數據。

2. 更新表

可以更新滿足條件的表。比如想更新eventType的字段字符串的編寫失誤,可以使用下面的表達,sql和scala的表達分別如下:

sql

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

scala

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string  "eventType = 'clck'",  Map("eventType" -> "'click'")
import org.apache.spark.sql.functions._import spark.implicits._
deltaTable.update(                // predicate using Spark SQL functions and implicits  col("eventType") === "clck",  Map("eventType" -> lit("click")));

3.merge算子實現upsert操作

使用merge操作可以將source表,view,dataframe中的數據upsert到目標的delta lake表中。該操作很像傳統數據庫的merge into操作,但是額外的支持刪除操作,和更新,插入和刪除的額外條件。

假設你計算過程中生成了一個dataframe,元素是events,包含eventId。而且該dataframe中數據部分數據的eventId已經在events表中存在了。這個時候就可以使用merge into實現,eventId存在的話就更新其對應的值,不存在就插入其對應的值。實現表達式如下:

sql

MERGE INTO eventsUSING updatesON events.eventId = updates.eventIdWHEN MATCHED THEN  UPDATE SET events.data = updates.dataWHEN NOT MATCHED  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

scala

import io.delta.tables._import org.apache.spark.sql.functions._
val updatesDF = ...  // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

關于如何進行delta lake 的curd操作就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

班戈县| 乃东县| 通化县| 景泰县| 米林县| 饶河县| 中江县| 石柱| 松滋市| 兴安盟| 铅山县| 班戈县| 科尔| 沁源县| 格尔木市| 枣庄市| 高密市| 密山市| 长岛县| 开封县| 郁南县| 弥勒县| 黑水县| 大丰市| 繁昌县| 嵊泗县| 杨浦区| 商南县| 奉节县| 兴安盟| 鄂州市| 侯马市| 隆安县| 额济纳旗| 六枝特区| 孝感市| 临猗县| 新民市| 防城港市| 北票市| 张家界市|