91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink與數據庫集成方法是什么

發布時間:2021-12-22 13:34:46 來源:億速云 閱讀:181 作者:iii 欄目:大數據

這篇文章主要介紹“Flink與數據庫集成方法是什么”,在日常操作中,相信很多人在Flink與數據庫集成方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink與數據庫集成方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

JDBC-Connector 的重構

JDBC Connector 在 Flink 1.11 版本發生了比較大的變化,我們先從以下幾個 Feature 來具體了解一下 Flink 社區在這個版本上對 JDBC 所做的改進。

  • FLINK-15782 :Rework JDBC Sinks[1] (重寫 JDBC Sink)


這個 issue 主要為 DataStream API 新增了 JdbcSink,對于使用 DataStream 編程的用戶會更加方便地把數據寫入到 JDBC;并且規范了一些命名規則,以前命名使用的是 JDBC 加上連接器名稱,目前命名規范為 Jdbc+ 連接器名稱

  • FLINK-17537:Refactor flink-jdbc connector structure[2] (重構 flink-jdbc 連接器的結構)


這個 issue 將 flink-jdbc 包名重命名為 flink-connector-jdbc,與 Flink 的其他 connector 統一,將所有接口和類從 org.apache.flink.java.io.jdbc(舊包)規范為新包路徑 org.apache.flink.connector.jdbc(新包),通過這種重命名用戶在對底層源代碼的閱讀上面會更加容易的理解和統一。

  • FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)


由于早期數據類型系統并不是很完善,導致了比較多的 Connector 在使用上會經常報數據類型相關的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現下圖問題:

Flink與數據庫集成方法是什么


基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重構,目前數據精度方面的支持已經很完善了。

  • FLIP-122:New Connector Property Keys for New Factory[4](新的連接器參數)


在 Flink 1.11 版本中,我們對 DDL 的 WITH 參數相對于 1.10 版本做了簡化,從用戶視角看上就是簡化和規范了參數,如表格所示:

Old Key (Flink 1.10)    
New Key (Flink 1.11)    
connector.type    
connector.type    
connector.url    
url    
connector.table    
table-name    
connector.driver    
driver    
connector.username    
username    
connector.password    
password    
connector.read.partition.column    
scan.partition.column    
connector.read.partition.num    
scan.partition.num    
connector.read.partition.lower-bound    
scan.partition.lower-bound    
connector.read.partition.upper-bound    
scan.partition.upper-bound    
connector.read.fetch-size    
scan.fetch-size    
connector.lookup.cache.max-rows    
lookup.cache.max-rows    
connector.lookup.cache.ttl    
lookup.cache.ttl    
connector.lookup.max-retries    
lookup.max-retries    
connector.write.flush.max-rows    
sink.buffer-flush.max-rows    
connector.write.flush.interval    
sink.buffer-flush.interval    
connector.write.max-retries    
sink.max-retries    

大家可以看到表格中有 3 個標紅的地方,這個是相對于 1.10 有發生變化比較多的地方。這次 FLIP 希望進一步簡化連接器屬性,以便使屬性更加簡潔和可讀,并更好地與 FLIP-107 協作。如果需要了解更多的 Connector 參數可以進一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助于從舊版本遷移到新版本并了解參數的變化。

  • FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主鍵約束問題)


Flink 1.10 存在某些 Query 無法推斷出主鍵導致無法進行 Upsert 更新操作(如下圖所示錯誤)。所以在 FLIP-87 中為 Flink SQL 引入的 Primary Key 約束。Flink 的主鍵約束遵循 SQL 標準,主鍵約束分為 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對數據進行校驗。我們常見數據庫的主鍵約束屬于 PRIMARY KEY ENFORCED,會對數據進行校驗。因為 Flink 并不持有數據,因此 Flink 支持的主鍵模式是 PRIMARY KEY NOT ENFORCED,  這意味著 Flink 不會校驗數據,而是由用戶確保主鍵的完整性。例如 HBase 里面對應的主鍵應該是 RowKey,在 MySQL 中對應的主鍵是在用戶數據庫的表中對應的主鍵。

Flink與數據庫集成方法是什么


JDBC Catalog


目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關系數據庫中的表,如果要在 Flink 中使用,用戶需要手動寫表的 DDL,一旦表的 Schema 發生改變,用戶需要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用于連接到各種關系型數據庫,使得 Flink 能夠自動檢索表,不用用戶手動輸入和修改。目前 JDBC Catalog 內置目前實現了 Postgres Catalog。Postgres catalog 是一個 read-only (只讀)的 Catalog,只支持讀取 Postgres 表,支持的功能比較有限。下面代碼展示了目前 Postgres catalog 支持的 6 個功能:數據庫是否存在、數據庫列表、獲取數據庫、根據數據庫名獲取表列表、獲得表、表是否存在。
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)

如果需要支持其他 DB (如 MySQL),需要用戶根據 FLIP-93 的 JdbcCatalog 接口實現對應不同的 JDBC Catalog。

JDBC Dialect


什么是 Dialect?

Dialect (方言)對各個數據庫來說,Dialect 體現各個數據庫的特性,比如語法、數據類型等。如果需要查看詳細的差異,可以點擊這里[6]查看詳細差異。下面通過對比 MySQL 和 Postgres 的一些常見場景舉例:

Dialect    
MySQL    
Postgres    
場景描述    
Grammar(語法)    
LIMIT 0,30    
WITH LIMIT 30 OFFSET 0    
分頁    
Data Type (數據類型)    
BINARY    
BYTEA,ARRAY    
字段類型    
Command (命令)    
show tables    
\dt    
查看所有表    

在數據類型上面,Flink SQL 的數據類型目前映射規則如下:

MySQL type    
PostgreSQL type    
Flink SQL type    
TINYINT    

TINYINT    
SMALLINT    
TINYINT UNSIGNED    
SMALLINT    
INT2    
SMALLSERIAL    
SERIAL2    
SMALLINT    
INT    
MEDIUMINT    
SMALLINT    
UNSIGNED    
INTEGER    
SERIAL    
INT    
BIGINT    
INT    
UNSIGNED    
BIGINT    
BIGSERIAL    
BIGINT    
BIGINT    
UNSIGNED    

DECIMAL(20, 0)    

Flink 目前支持三種 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于測試,更多的類型映射可以點擊下方鏈接前往官方文檔查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
如何保證 Dialect Upsert 的冪等性?

如果定義了主鍵,JDBC 寫入時是能夠保證 Upsert 語義的, 如果 DB 不支持 Upsert 語法,則會退化成 DELETE + INSERT 語義。Upsert query 是原子執行的,可以保證冪等性。這個在官方文檔中也詳細描述了更新失敗或者存在故障時候如何做出的處理,下面的表格是不同的 DB 對應不同的 Upsert 語法:

Database    
Upsert Grammar    
MySQL    
INSERT .. ON DUPLICATE KEY UPDATE ..    
PostgreSQL    
INSERT .. ON CONFLICT .. DO UPDATE SET ..    

如何自定義 Dialect?

目前如果要實現自定義 Dialect (比如 SQL Server、Oracle 等), 需要用戶自己實現對應 Dialect 修改源碼并重新打包 flink-connector-jdbc。社區正在討論提供一種插件化 dialect 的機制, 讓用戶可以不用修改源碼打包就能實現自定義 Dialect,這個機制需要把 Dialect 接口暴露給用戶。目前的 Dialect 接口不夠清晰,沒有考慮 DataStream API 的使用場景,也沒有考慮到 一些復雜的 SQL 場景,所以這個接口目前不太穩定(后續版本會修改) 。

社區目前之所以沒有把這個 API 開放給用戶,是從用戶使用的體驗角度考慮,希望把這種頂級 API 設計得盡量穩定、簡潔后再開放出來給用戶使用,避免用戶在后續 Flink 版本的迭代中多次修改代碼。目前社區已經有相應的計劃去做了,大家可以留意 FLINK-16833[7]  提出的 JDBCDialect 插件化設計。

實踐 Demo


大家看完上述 Flink 1.11 在 JDBC 所做的改動后,大家可以嘗試下面這個關于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步機制。

環境與版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 環境準備并在提供的地址下載好對應的安裝包和 connector jar。
  2. 測試數據準備,通過拉起容器運行已經打包好的鏡像。其中 Kafka 中的 changelog 數據是通過 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 通過 SQL Client 編寫 SQL 作業,分別創建 Flink 訂單表,維表,用戶表,產品表,并創建 Function UDF。從 PG Catalog 獲取結果表信息之后,把作業提交至集群執行運行。
  4. 測試 CDC 數據同步和維表 join,通過新增訂單、修改訂單、刪除訂單、維表數據更新等一系列操作驗證 CDC 在 Flink 上如何運行以及寫入結果表。

Flink與數據庫集成方法是什么


上圖為業務流程整體圖,項目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl

問答環節


1.Flink SQL Client 上面執行的 use default,是使用哪個 catlog 呢?

答:Flink 內部有一個內置 Catlog,它把 meta 數據存于內存中。在 SQL Client 上沒有顯式指定 Hive catlog 或者 jdbc catlog 時會使用內置的 Catalog,剛剛的案例給大家演示的是 Postgres Catalog,里面有結果表。在內置 Catlog 可以看到我們剛剛創建 Kafka 的表,MySQL 的維度表。

2.Flink MySQL DDL 連接 8 小時后就會自動斷開的問題是否已經解決?

這個問題會在 1.12 版本解決此問題,目前 master 分支已經合并,具體可以參考以下地址  ,描述了相關問題的討論和解決辦法。

3.什么是 CDC?能大概講下目前 Flink 支持的 CDC 嗎?

通過 Change Data Capture 機制(CDC)來將外部系統的動態數據(如 Mysql BinLog、Kafka Compacted Topic)導入 Flink,以及將 Flink 的 Update/Retract 流寫出到外部系統中是用戶一直希望的功能。

Flink 1.11 實現了對 CDC 數據讀取和寫出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴開源同步工具) 兩種 CDC 格式。Debezium 在國外用得比較多,Canal 在國內用得比較多,兩者格式會有所區別,詳細可以參考官方使用文檔。

到此,關于“Flink與數據庫集成方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

麻阳| 长治市| 天水市| 岳池县| 西乌珠穆沁旗| 西林县| 桐城市| 南皮县| 新和县| 平度市| 和硕县| 郎溪县| 梧州市| 神木县| 顺义区| 林甸县| 兴城市| 含山县| 宁波市| 张家界市| 奉节县| 延边| 南涧| 牟定县| 青海省| 灵武市| 宜宾市| 汝州市| 洛宁县| 正定县| 理塘县| 连南| 长丰县| 莆田市| 若尔盖县| 洞头县| 隆德县| 襄樊市| 科尔| 南康市| 宜宾市|