您好,登錄后才能下訂單哦!
delta lake的merge操作以及性能調優是怎樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
鑒于merge操作的復雜性,下面主要對其進行展開講解。
1.merge算子操作語法
merge操作的sql表達如下:
import io.delta.tables._
import org.apache.spark.sql.functions._
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
merge 編碼操作還是有些約束需要詳細描述的。
1.1 可以有(1,2,3)個wenMatched或者whenNotMatched的子語句。其中,whenMatched操作最多有兩個語句,whenNotMatched最多有一個子語句。
1.2 當源表的數據和目標表的數據滿足匹配條件的時候,執行的是whenMatched語句。這些語句可以有以下幾個語義:
a) whenMatched語句最多有一個update和一個delete表達。merge中的update行為僅僅更新滿足條件的目標表一行數據的指定列。而delete操作會刪除所有匹配的行。
b) 每個whenMatched語句都可以有一個可選的條件。如果該可選的條件存在,update和delete操作僅僅在該可選條件為true的時候,才會在匹配的目標數據上執行相應操作。
c) 如果有兩個whenMatched子句,則將按照它們被指定的順序(即,子句的順序很重要)進行執行。第一個子句必須具有一個子句條件(否則,第二個子句將永遠不會執行)。
d) 如果兩個whenMatched子語句都有條件并且兩個子語句的條件都不為true,那不會對目標數據進行任何修改。
c) 支持滿足條件的源dataset中相關行的所有列同時更新到目標detla表的相關列,表達式如下:
whenMatched(...).updateAll()
等價于:
whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
要保證源表和目標表有相同的列,否則會拋出異常。
1.3 給定的條件,源表的一行數據,跟目標表沒有完成匹配的時候執行whenNotMatched語句。該子語句有以下語法:
a) whenNotMatched僅僅支持insert表達。根據指定的列和相關的條件,該操作會在目標表中插入一條新的數據,當目標表中存在的列沒有明確的指定的時候,就插入null。
b) whenNotMatched語句可以有可選條件。如果指定了可選條件,數據僅僅會在可選條件為true的時候才會插入。否則,源列會被忽略。
c) 也可以插入匹配目標表相關行的所有源表行的數據列,表達式:
whenNotMatched(...).insertAll()
等價于:
whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
要保證源表和目標表有相同的列,否則就會拋出異常。
2.schema校驗
merge操作會自動校驗insert和update操作產生額數據schema是否與目標表的schema匹配。規則如下:
a) 對于update和insert行為,指定的目標列必須在目標delta lake表中存在。
b) 對于updateAll和insertAll操作,源dataset必須包含所有目標表的列。源dataset可以有目標表中不存在的列,但是這些列會被忽略。當然也可以通過配置保留僅源dataset有的列。
c) 對于所有操作,如果由生成目標列的表達式生成的數據類型與目標Delta表中的對應列不同,則merge嘗試將其強制轉換為表中的類型。
3.自動schema轉換
默認情況下,updateAll和insertAll操作僅僅會更新或插入在目標表中有的相同列名的列,對于僅僅在源dataset中存在而目標表中不存在的列,會被忽略。但是有些場景下,我們希望保留源dataset中新增的列。首先需要將前面介紹的一個參數spark.databricks.delta.schema.autoMerge.enabled設置為true。
注意:
a. schema自動增加僅僅是針對updateAll操作或者insertAll操作,或者兩者。
b. 僅僅頂層的列會被更改,而不是嵌套的列。
c. 更新和插入操作不能顯式引用目標表中不存在的目標列(即使其中有updateAll或insertAll作為子句之一)。
4.schema推斷與否對比
據一些例子,進行schema自動推斷與不自動推斷的對比
對比一
目標列(key,value),源列(key,value,newValue),對源源表執行下面的sql操作:
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
沒有使用自動schema推斷的話:目標表的schema信息是不會變的。僅僅key,value列被更新。
使用了schema推斷的話:表的schema就會演變為(key,value,newValue)。updateAll操作,會更新value和newValue列。對于insertAll操作會插入整行(key,value,newValue)。
對比二
目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql:
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
不使用schema推斷:updateAll和insertAll操作都會拋異常。
使用schema推斷:表的shema會演變為(key,oldValue,newValue)。updateAll操作會更新key和value列,而oldValue列不變。insertAll操作會插入(key,null,newValue),oldValue會插入null。
對比三
目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().update(Map( "newValue" -> col("s.newValue"))) .whenNotMatched().insertAll() .execute()
不使用schema推斷:update操作會拋出異常,因為newValue在目標表中并不存在。
使用schema推斷:update操作會拋出異常,因為newValue在目標表中并不存在。
對比四:
目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql
targetDeltaTable.alias("t") .merge( sourceDataFrame.alias("s"), "t.key = s.key") .whenMatched().updateAll() .whenNotMatched().insert(Map( "key" -> col("s.key"), "newValue" -> col("s.newValue"))) .execute()
不使用schema推斷:insert操作會拋出異常,因為newValue在目標表中并不存在。
使用schema推斷:insert操作依然會拋出異常,因為newValue在目標表中并不存在。
5.性能調優
下面幾個方法可以有效減少merge的處理時間:
a.減少匹配查找的數據量
默認情況下,merge操作會掃描整個delta lake表找到滿足條件的數據。可以加些謂詞,以減少數據量。比如,數據是以country和date進行分區的,而你只想更新特定國家的昨天的數據。就可以增加一些條件,比如:
events.date = current_date() AND events.country = 'USA'
這樣就只會處理指定分區的數據,大大減少了數據掃描量。也可以避免不同分區之間操作的一些沖突。
b.合并文件
如果數據存儲的時候有很多小文件,就會降低數據的讀取速度。可以合并小文件成一些大文件,來提升讀取的速度。后面會說到這個問題。
c.控制shuffle的分區數
為了計算和更新數據,merge操作會對數據進行多次shuffle。shuffle過程中task數量是由參數spark.sql.shuffle.partitions來設置,默認是200。該參數不僅能控制shuffle的并行度,也能決定輸出的文件數。增加這個值雖然可以增加并行度,但也相應的增加了產生小文件數。
d.寫出數據之間進行重分區
對與分區表,merge操作會產生很多小文件,會比shuffle分區數多很多。原因是每個shuffle任務會為多分區表產生更多的文件,這可能會是一個性能瓶頸。所以,很多場景中使用表的分區列對數據進行寫入前重分區是很有效的。可以通過設置spark.delta.merge.repartitionBeforeWrite為true來生效。
關于delta lake的merge操作以及性能調優是怎樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。