您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Kafka Connect處理更新和刪除的方法”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Kafka Connect處理更新和刪除的方法”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
Kafka Connect 是一款出色的工具,可讓您輕松設置從一個數據源到目標數據庫的連續數據流。它的配置非常簡單,當您有遺留系統為您需要的業務數據提供服務時,出于某種原因或其他原因,它在不同的地方非常有用。我的典型用例是將數據從 Oracle 表移動到微服務使用的 MongoDB 集合。這允許更好的可擴展性,因為我們不必使用生產查詢大量訪問源表。
當您打開 Kafka Connect 手冊時,不容易解釋的一件事是如何處理修改已移動的現有數據的操作;或者換句話說,更新和刪除。我認為這是我們使用的典型 JDBC/MongoDB 連接器對的限制。有一段時間我探索了 Debezium 連接器,它承諾捕獲這些類型的事件并將它們復制到目標數據庫中。使用 OracleDB 的 POC 對我們來說并不成功。我們對這些數據庫的訪問有限,而且這些連接器所需的配置級別并不是一個簡單的解決方案。
當我們繼續使用連接器時,我們發現有一些方法可以處理這些場景。我將解釋兩種策略。第一個是最理想的,需要在我們的源數據庫中進行特定設計。如果該設計不存在且因任何原因無法更改,則第二個是替代解決方案。
假設我們有一個處理促銷活動的舊系統。為了簡化我們的示例,假設我們有一個包含三列的基本表。我們需要不斷地將這些數據從 SQL 數據庫移動到基于文檔的數據庫,如 MongoDB。
首先,我們需要對可以使用的兩種 Kafka 連接器進行快速描述:增量和批量。嚴格來說,JDBC連接器有四種模式:bulk、timestamp、incrementing、timestamp+incrementing。我將最后三個分組為增量,因為它們共享相同的基本概念。您只想移動從源中檢測到的新數據。
批量連接器始終移動整個數據集。但是,很大程度上取決于我們正在移動的數據的用例。理想情況下,增量連接器是最好的解決方案,因為在資源使用或數據準備方面更容易管理小塊新數據。這里的問題是:Kafka Connect 如何使用純 SQL 查詢,以及它如何知道何時在源中插入了新數據?
源連接器配置可以使用以下兩個屬性之一(或兩者):incrementing.column.name 和 timestamp.column.name。Incrementing 屬性使用增量列(如自動生成的 id)來檢測何時插入新行。Timestamp 屬性使用 DateTime 列來檢測新更改。Kafka Connect 持有一個偏移量,將其附加到用于從源獲取數據的 SQL 查詢中。
例如,如果我們的表名為“promotions”,我們將在源連接器的查詢屬性中使用,如下所示:
"query": "SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS",
"timestamp.column.name": "LAST_UPDATE_DATE"
Kafka 內部將查詢修改為如下所示:
SELECT * FROM ( SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS)
WHERE LAST_UPDATE_DATE > {OFFSET_DATE}
在接收器連接器端,即在目標數據庫中保存數據的連接器,我們需要設置一個策略來根據 ID 進行正確的 upsert。您可以在您使用的接收器連接器的文檔中閱讀更多相關信息。對于 MongoDB 連接器,我使用的典型設置是:
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",
這表明我們文檔的 _id 將來自源數據。在這種情況下,我們的源查詢應該包含一個 _id 列:
"query": "SELECT PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS"
至此,我們有了檢測新插入的基本配置。每次添加帶有新時間戳的新促銷時,源連接器都會抓取它并將其移動到所需的目的地。但是有了這個完全相同的配置,我們就可以實現檢測更新和刪除的總目標。我們需要的是正確設計我們的數據源。
如果我們想確保我們的更新被處理并反映在目標數據庫中,我們需要確保在源表中進行的每個更新也更新時間戳列值。這可以通過寫入它的應用程序將當前時間戳作為更新操作的參數來完成,或者創建一個監聽更新事件的觸發器。由于 sink 連接器根據 id 處理 upsert,更新也會反映在目標文檔中。
為了能夠處理刪除,我們需要前面的步驟以及數據庫設計中被認為是好的做法:軟刪除。這種做法是在需要時不刪除(硬刪除)數據庫中的記錄,而只是用一個特殊的標志來標記它,表明該記錄不再有效/活動。這在可恢復性或審計方面有其自身的好處。這當然意味著我們的應用程序或存儲過程需要了解這種設計并在查詢數據時過濾掉不活動的記錄。
如果很難更新刪除記錄的應用程序來進行軟刪除(以防數據源的設計沒有考慮到這一點),我們還可以使用觸發器來捕獲硬刪除并改為進行軟刪除。
為了我們的 Kafka Connect 目的,我們需要做的是在記錄被標記為非活動時更改我們的時間戳列值。在此示例中,我們將 HOT SUMMER 促銷設置為非活動,將 ACTIVE 列設置為 0。LAST_UPDATE_DATE 還修改為最近的日期,這將使源連接器獲取記錄。
當數據被移動時,例如移動到 MongoDB,為了使用它,我們還需要根據這個 ACTIVE 字段進行過濾:
db.getCollection('promotions').find({active: 1})
如果我們必須處理不可更改的設計,則可以使用的最后一種方法選項不允許修改源模式以具有時間戳列或活動標志。這個選項有我所說的版本化批量。正如我之前所解釋的,每次調用時,批量連接器都會移動整個數據集。在大多數情況下,我遇到過增量更新總是更可取的做法,但在這種情況下,我們可以利用批量選項。
由于我們需要跟蹤新插入、更新或刪除的內容,因此我們可以每次移動數據,添加一個額外的列來標識數據的快照。我們還可以使用查詢數據時的時間戳。由于時間戳是自然后代排序的值,如果我們想要最新的快照,我們可以很容易地通過最后一個或倒數第二個(我將解釋為什么這可能更好)一旦數據移動到目標位置的快照進行過濾。
Oracle 中的查詢如下所示:
"query": "SELECT PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY,
TO_CHAR(SYSDATE, 'yyyymmddhh34miss') AS SNAPSHOT FROM PROMOTIONS"
這種方法需要一些配置,這些配置對于使用最終數據集時的正確性能至關重要。您可以想象,索引在這里很重要,更重要的是,在新的快照列中。另一個重要的考慮因素是消耗的空間。根據每個快照中的記錄數量,我們可能需要刪除舊版本。我們可以為此使用一些計劃任務,或者像使用 MongoDB 索引一樣配置 TTL。
在使用數據時,我們首先需要獲取最新的快照。我提到倒數第二個可能更好。原因是最新的可能是正在進行的。換句話說,當您執行查詢以使用數據時,數據可能會移動。如果您對目標數據庫的查詢是任何類型的聚合,您可能會得到不完整的結果。因此,對于最新的快照,我們不確定它是否處于準備好使用的狀態。如果我們抓取倒數第二個,我們可以確定快照是完整的。
在下一個示例中,移動了數據的兩個版本。版本 2021073012000包含三個文檔。較新的版本2021080112000有兩個文檔,一個文
讀到這里,這篇“Kafka Connect處理更新和刪除的方法”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。