91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Oracle數據怎么發送到kafka傳輸數據

發布時間:2021-08-18 20:03:48 來源:億速云 閱讀:184 作者:chen 欄目:網絡安全

這篇文章主要介紹“Oracle數據怎么發送到kafka傳輸數據”,在日常操作中,相信很多人在Oracle數據怎么發送到kafka傳輸數據問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Oracle數據怎么發送到kafka傳輸數據”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

                   配置
OGG ADPATER FOR KAFKA
需要的kafka包:
Kafka 0.8.2.1
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar

###################################################################配置OGG
主庫

dblogin userid goldengate, password oggpassword add extract EXTJMS,tranlog, threads 3,begin now add exttrail /data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200

add trandata testKAFKA.* --add schematrandata testKAFKA

抽取進程: 
edit param extjms EXTRACT EXTJMS
SETENV (ORACLE_SID = "rac3")
SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500
userid goldengate, password oggpassword
EXTTRAIL /data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5
DISCARDFILE /data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500
tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN
CHECKPOINTSECS 5
FLUSHCSECS 80
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
--DDL INCLUDE ALL
DDL INCLUDE MAPPED &
exclude objname testKAFKA.PK_CATEGORY_RANKLIST & 
exclude objtype 'PACKAGE' &
exclude objtype 'PACKAGE BODY' &
exclude INSTR 'REPLACE SYNONYM' &
exclude INSTR 'CREATE OR REPLACE PACKAGE' &
exclude objtype 'PROCEDURE' &
exclude objtype 'FUNCTION' &
exclude objtype 'TYPE' &
exclude objtype 'TRIGGER' &
exclude objtype 'GRANT' &
exclude instr 'GRANT' &
exclude objtype 'DATABASE LINK' &
exclude objtype 'CONSTRAINT' &
exclude objtype 'JOB' &
exclude instr 'ALTER SESSION' &
exclude instr 'MATERIALIZED VIEW'  &
exclude INSTR 'AS SELECT' &
exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP" 
--GETUPDATEBEFORES
--ddloptions addtrandata,REPORT
FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT
dynamicresolution
EOFDELAYCSECS 5
TABLEEXCLUDE testKAFKA.RULE_ACTION_LOG;
TABLE testKAFKA.* ;
SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS; Database altered. SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS; Database altered. SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database; SUPPLEME SUP SUP FOR
-------- --- --- ---
YES      YES YES YES SQL>  源端添加新的pump進程:
在testKAFKA源庫測試添加pump進程:
添加pump進程:
添加新的pump:
add extract EDPKK,exttrailsource /data/oggsrc/dirdat/jm, begin now edit param EDPKK EXTRACT EDPKK
setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
RecoveryOptions OverwriteMode
RMTHOST 192.168.0.3, MGRPORT 7839
RMTTRAIL /data/ogg_for_bigdata/dirdat/kk
RMTTRAIL /data/ogg_for_kafka/dirdat/kk
DISCARDFILE ./dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5
TABLE testKAFKA.* ; add rmttrail /data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200 add rmttrail /data/ogg_for_kafka/dirdat/kk,extract EDPKK megabytes 200 編輯定義文件:
userid goldengate, password oggpassword
defsfile dirdef/testKAFKA.def
TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;                
TABLE testKAFKA.*;

傳遞定義文件:
./defgen paramfile ./dirprm/defgen.prm
 cd dirdef/
[oracle@vm01 dirdef]$ scp  dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef
The authenticity of host '192.168.0.3 (192.168.0.3)' can't be established.
RSA key fingerprint is 46:8c:35:61:74:ca:43:e0:b0:74:d5:ff:0c:2f:67:8a.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.0.3' (RSA) to the list of known hosts.
reverse mapping checking getaddrinfo for bogon failed - POSSIBLE BREAK-IN ATTEMPT!
oracle@192.168.0.3's password: 
testKAFKA.def                                                                                                                  100%  899KB 898.7KB/s   00:00    
[oracle@vm01 dirdef]$  目標端直接端
mgr: PORT 7839
DYNAMICPORTLIST 7840-7850
--AUTOSTART replicat *
--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
PURGEOLDEXTRACTS /data/ogg_for_kafka/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

添加 UE DATA PUMP:
 使用版本:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209 在目標端  新端口7889:
ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kk

ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_kafka/dirdat/kk edit param repkk GGSCI (localhost.localdomain) 18> view param repkk EXTRACT repkk
SETENV (GGS_USEREXIT_CONF ="dirprm/repkk.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/testKAFKA.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE testKAFKA.MV*;
TABLE testKAFKA.*;

[oracle@repvm dirdef]$ cp testKAFKA.def /data/ogg_for_kafka/dirdef
配置文件:
[oracle@repvm dirprm]$ cat repkk.props 
gg.handlerlist =kafkahandler
#gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties
gg.handler.kafkahandler.TopicName =zqtest
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE gg.log=log4j
#gg.log.level=INFO
gg.log.level=DEBUG gg.report.time=30sec #gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/* javawriter.bootoptions=-Xmx4096m -Xms4096m -Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
[oracle@repvm dirprm]$  javawriter.bootoptions 必須包含ogg for kafka的lib包

kafka的屬性文件:
bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000

compression.type 參數默認: 配置kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ pwd
/data/kafka_2.10-0.8.2.2
[oracle@repvm kafka_2.10-0.8.2.2]$ 
[oracle@repvm kafka_2.10-0.8.2.2]$ grep -v '^$\|^\s*\#' config/server.properties
broker.id=0
port=9092
num.network.threads=3
 
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
[oracle@repvm kafka_2.10-0.8.2.2]$ 

[oracle@repvm libs]$ ll
total 17452
-rw-r--r-- 1 oracle oinstall   53244 Aug 31  2014 jopt-simple-3.2.jar
-rw-r--r-- 1 oracle oinstall 3991269 Sep  3  2015 kafka_2.10-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall   37748 Sep  3  2015 kafka_2.10-0.8.2.2-javadoc.jar
-rw-r--r-- 1 oracle oinstall 2324165 Sep  3  2015 kafka_2.10-0.8.2.2-scaladoc.jar
-rw-r--r-- 1 oracle oinstall  521466 Sep  3  2015 kafka_2.10-0.8.2.2-sources.jar
-rw-r--r-- 1 oracle oinstall 1233391 Sep  3  2015 kafka_2.10-0.8.2.2-test.jar
-rw-r--r-- 1 oracle oinstall  324016 Sep  3  2015 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall  481535 Aug 31  2014 log4j-1.2.16.jar
-rw-r--r-- 1 oracle oinstall  165505 Aug 31  2014 lz4-1.2.0.jar
-rw-r--r-- 1 oracle oinstall   82123 Aug 31  2014 metrics-core-2.2.0.jar
-rw-r--r-- 1 oracle oinstall 7126372 Nov 25  2014 scala-library-2.10.4.jar
-rw-r--r-- 1 oracle oinstall   28688 Aug 31  2014 slf4j-api-1.7.6.jar
-rw-r--r-- 1 oracle oinstall    9753 Aug 31  2014 slf4j-log4j12-1.6.1.jar
-rw-r--r-- 1 oracle oinstall  594033 May 29  2015 snappy-java-1.1.1.7.jar
-rw-r--r-- 1 oracle oinstall   64009 Aug 31  2014 zkclient-0.3.jar
-rw-r--r-- 1 oracle oinstall  792964 Aug 31  2014 zookeeper-3.4.6.jar
[oracle@repvm libs]$

kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar

[oracle@repvm bin]$ pwd
/data/kafka_2.10-0.8.2.2/bin
[oracle@repvm bin]$
nohup sh  kafka-server-start.sh ../config/server.properties > /tmp/server.log &

先啟動zookeeper:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup  bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 18645
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ tail -f nohup.out 
[2016-06-02 12:22:42,981] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,035] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
啟動kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/kafka-server-start.sh config/server.properties &
[1] 18845
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ 創建topic:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
Created topic "zqtest".
[oracle@repvm kafka_2.10-0.8.2.2]$ 

查看:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
zqtest
[oracle@repvm kafka_2.10-0.8.2.2]$  ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
測試發送消息:
[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zqtest 
[2016-06-02 14:33:50,690] WARN Property topic is not valid (kafka.utils.VerifiableProperties) ds
接收端:
[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zqtest --from-beginning ds 成功接受!!!

到此,關于“Oracle數據怎么發送到kafka傳輸數據”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

上饶县| 莱阳市| 长沙县| 成武县| 永嘉县| 柘荣县| 卢龙县| 新龙县| 阜南县| 鄯善县| 昌乐县| 伊川县| 武胜县| 三江| 西丰县| 琼结县| 射阳县| 平原县| 南漳县| 兴仁县| 嘉义县| 铁岭县| 通辽市| 通山县| 崇明县| 柘荣县| 大同县| 河曲县| 延川县| 荔浦县| 罗源县| 瓦房店市| 玛多县| 南川市| 太湖县| 彰化市| 德阳市| 顺义区| 贞丰县| 探索| 胶南市|