您好,登錄后才能下訂單哦!
這篇文章主要介紹oracle數據如何通過goldengate實時同步到kafka消息隊列中,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
組件版本
組件 | 版本 | 描述 |
源端oracle | oracle 11.2.0.4 for linux x64 | 源端oracle |
源端ogg | oracle ogg 11.2.0.1.20 for oracle linux x64 | 源端ogg,用于抽取源端oracle的數據變更,并將變更日志發送目標端 |
目標端kafka | kafka_2.11-0.11.0.2 for linux x64 | 消息隊列,接收目標端ogg推送過來的數據 |
目標端ogg | 目標端ogg,接收源端發送的oracle事物變更日志,并將變更推送到kafka消息隊列中 |
1.OGG Manager
OGG Manager用于配置和管理其它OGG組件,配置數據抽取、數據推送、數據復制,啟動和停止相關組件,查看相關組件的運行情況。
2.數據抽取(Extract)
抽取源端數據庫的變更(DML, DDL)。數據抽取主要分如下幾種類型:本地抽取從本地數據庫捕獲增量變更數據,寫入到本地Trail文件數據推送(Data Pump)從本地Trail文件讀取數據,推送到目標端。初始數據抽取從數據庫表中導出全量數據,用于初次數據加載
3.數據推送(Data Pump)
Data Pump是一種特殊的數據抽取(Extract)類型,從本地Trail文件中讀取數據,并通過網絡將數據發送到目標端OGG
4.Trail文件
數據抽取從源端數據庫抓取到的事物變更信息會寫入到Trail文件。
5.數據接收(Collector)
數據接收程序運行在目標端機器,用于接收Data Pump發送過來的Trail日志,并將數據寫入到本地Trail文件。
6.數據復制(Replicat)
數據復制運行在目標端機器,從Trail文件讀取數據變更,并將變更數據應用到目標端數據存儲系統。本案例中,數據復制將數據推送到kafka消息隊列。
7.檢查點(Checkpoint)
檢查點用于記錄數據庫事物變更。
源端Oracle數據庫配置
開啟源端歸檔
SQL> archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/product/11.2.3/db_1/dbs/arch
Oldest online log sequence 12
Next log sequence to archive 17
Current log sequence 17
若為打開歸檔解決如下:
conn / as sysdba (以DBA身份連接數據庫)
shutdown immediate (立即關閉數據庫)
startup mount (啟動實例并加載數據庫,但不打開)
alter database archivelog; (更改數據庫為歸檔模式)
alter database open; (打開數據庫)
alter system archive log start; (啟用自動歸檔)
2)OGG基于輔助日志等進行實時傳輸,故需要打開相關日志確保可獲取事務內容,通過下面的命令查看該狀態
SQL> select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database;
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
---------- ------------------------
YES YES
如果沒有開啟輔助日志,需要開啟
SQL> alter database force logging;
SQL> alter database add supplemental log data;
SQL>alter database add supplemental log data(all) columns;
3.開啟goldengate復制參數
SQL> alter system set enable_goldengate_replication = true;
4.創建源端Oracle賬號
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
SQL> grant dba to ggsadmin;
5.創建測試表 (生產略)
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
436
部署ogg
源端 (oracle源端)
1、解壓
先建立ogg目錄
mkdir -p /ogg
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /ogg
chown -R oracle:oinstall /ogg (使oracle用戶有ogg的權限,后面有些需要在oracle用戶下執行才能成功)
2配置ogg環境變量
為了簡單方便起見,建議在生產中配置oracle的環境變量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
export JAVA=$JAVA_HOME/bin/java
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效環境變量
source /home/oracle/.bash_profile
3、OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
4、配置源端Manager
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
--添加
oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默認監聽端口
DYNAMICPORTLIST 7811-7820 --動態端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7 --/
LAGREPORTHOURS 1 --每隔一小時檢查一次傳輸延遲情況
LAGINFOMINUTES 30 --傳輸延時超過30分鐘將寫入錯誤日志
LAGCRITICALMINUTES 45 --傳輸延時超過45分鐘將寫入警告日志
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --設定172網段可連接
5、添加同步表級別日志
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
目標端 (kafka目標端)
1、 解壓
mkdir -p /ogg
unzip V839824-01.zip
tar xf ggs_Adapters_Linux_x64.tar -C /ogg/
2配置ogg環境變量
為了簡單方便起見,建議在生產中配置oracle的環境變量文件/home/oracle/.bash_profile里配置
export JAVA_HOME=/usr/local/java1.8/jre
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH
export OGG_HOME=/ogg
export PATH=$PATH:$OGG_HOME
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
生效環境變量
source /home/oracle/.bash_profile
OGG初始化
ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
配置源端Manager
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
----添加
PORT 7810 --默認監聽端口
DYNAMICPORTLIST 7811-7820 --動態端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --進程有問題,每3分鐘重啟一次,一共重啟五次
PURGEOLDEXTRACTS ./dirdat/, USECHECKPOINTS, MINKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
--ACCESSRULE, PROG , IPADDR 172..., ALLOW --設定172網段可連接
GGSCI (172-16-101-242) 4> edit param ./GLOBALS
--添加
CHECKPOINTTABLE ggsadmin.checkpoint
全量數據同步(oracle to kafka)
1. 配置源端數據初始化
1) 配置源端初始化進程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
2) 配置源端初始化參數
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE ./dirdat/ek,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
3)源端生成表結構define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
-- 添加
defsfile /ogg/dirdef/define_kfk.txt
userid ggsadmin,password oracle
table baiyang.ora_to_kfk;
4)獲取oracle全量數據
$cd /ogg
$./defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
5) 將獲取全量數據記錄傳送到目標端
- 將此文件傳輸到目標段dirdef文件夾
scp /ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt
2、 配置目標端數據初始化進程
1) 配置目標端初始化進程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
2) 配置目標端初始化參數
GGSCI (172-16-101-242) 6> edit params initkfk
-- 添加
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/define_kfk.txt
REPLACEBADCHAR SKIP
SOURCECHARSET OVERRIDE ISO-8859-1
EXTFILE ./dirdat/ek
reportcount every 1 minutes, rate
grouptransops 10000
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
3) 配置ogg 針對kafka相關參數
vi ./dirprm/kafka.props
--添加
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicName=test_ogg --舊版參數,本次使用舊版參數
#gg.handler.kafkahandler.topicMappingTemplate=test_ogg –-新版本參數
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*
kafka 安裝的位置 ogg安裝的位置
將./dirprm/kafka.props 文件復制到/ogg/AdapterExamples/big-data/kafka 目錄下
vi ./dirprm/custom_kafka_producer.properties
bootstrap.servers=172.16.101.242:9092 ---kafka地址
acks=-1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400 --數據有堆積
linger.ms=10000 --數據傳輸kafka有延時
將./dirprm/custom_kafka_producer.properties 文件復制到/ogg/AdapterExamples/big-data/kafka
3、 開啟抽取全量任務
源端:
GGSCI (dtproxy) 20> start mgr
GGSCI (dtproxy) 21> start initkfk
目標端全量數據應用
GGSCI (172-16-101-242) 13> start mgr
cd /ogg
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
--查看應用日志是否有錯誤
cd /opt/ogg/dirrpt
more init01.rpt
4、 驗證kafka全量數據
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg --from-beginning
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
全量數據已經同步到目標kafka topic test_ogg
增量數據同步(oracle to kafka)
源端配置
1. 源端抽取進程配置
GGSCI (dtproxy) 9> edit param extkfk
-- 添加
extract extkfk
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggsadmin,password oracle
FETCHOPTIONS NOUSESNAPSHOT
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
exttrail ./dirdat/to
table baiyang.ora_to_kfk;
2、添加extract進程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk
3、配置源端推送進程
GGSCI (dtproxy) 12> edit param pupkfk
-- 添加
extract pupkfk
passthru
dynamicresolution
userid ggsadmin,password oracle
rmthost 172.16.101.242 mgrport 7810
rmttrail ./dirdat/to
table baiyang.ora_to_kfk;
4、添加投遞進程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource ./dirdat/to
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk
目標端配置
1、 配置目標端恢復進程
edit param repkfk
-- 添加
REPLICAT repkfk
SOURCEDEFS ./dirdef/define_kfk.txt
targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
2、 添加trail文件到replicate進程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint
開啟增量實時數據抓取
源端:
./ggsci
GGSCI (dtproxy) 5> start extkfk
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6> start pupkfk
Sending START request to MANAGER ...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7> status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
目標端:
/ggsci
GGSCI (172-16-101-242) 7> start replicat repkfk
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (172-16-101-242) 8> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
測試增量數據抓取
源端:
Oracle插入增量數據
SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;
SQL> commit;
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
905
目標端:
查看Kafka消息隊列消費數據
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 172.16.101.242:9092 --topic test_ogg
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
DDL操作
如果ogg 源端,也就是oracle 端的表增加字段或者刪除字段,或者修改字段等等,只要是修改表結構定義的,就算是DDL操作,在ogg for bigdata 12.2 穩定版本中,目前是不支持同步ddl語句的,在12,3版本以后會進行ddl支持。
在12.2 ogg for bigdata 中,源端如果做ddl,需要在源端的定義表結構文件中重新生成define_kfk.txt文件的定義,并將define_kfk.txt文件傳輸到目標端中。
舉例說明:
源端:(oracle端)
1) 源表添加id字段
alter table ORA_TO_KFK add id number;
2) ogg 源端需要重新生成表定義文件
mv /ogg/dirdef/define_kfk.txt /ogg/dirdef/define_kfk.txt.bak1
cd /ogg
/defgen paramfile dirprm/define_kfk.prm
3) 將生成的表定義文件scp 到目標端
cd /ogg
scp ./dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/
4) 源端抽取進程需要重啟
GGSCI (edsir1p9) 2> stop EXTKFK
Sending STOP request to EXTRACT EXTKFK ...
Request processed.
GGSCI (edsir1p9) 3> start EXTKFK
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (edsir1p9) 4> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:08
EXTRACT RUNNING PUPKFK 00:00:00 00:00:07
目標端:(kafka端)
1)查看目標端的應用進程發生了abend
GGSCI (node) 38> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT ABENDED REPKFK 00:10:27 00:05:29
2)啟動復制進程
GGSCI (node) 40> start REPKFK
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (node) 9> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:04
測試:
源端插入一條數據
SQL> insert into ORA_TO_KFK(OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9);
1 row created.
SQL> commit;
目標端:
cd /kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.57:9092 --topic ogg_test
數據已經從源端oracle同步到目標端kafka中。至此oracle新添加一列,可以正常同步到kafka中。
以上是“oracle數據如何通過goldengate實時同步到kafka消息隊列中”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。