您好,登錄后才能下訂單哦!
一、簡介
1.1 描述
ActiveMQ不僅支持persistent和non-persistent兩種方式,還支持消息的恢復(recovery)方式、重新投遞等
1.2 PTP與PUB/SUB
1.2.1 PTP
對于持久化訂閱主題,每一個消費者將獲得一個消息的復制。
1.2.2 PUB/SUB
對于持久化訂閱主題,每一個消費者將獲得一個消息的復制。
1.3 有效的消息存儲
ActiveMQ提供了一個插件式的消息存儲,類似于消息的多點傳播,主要實現了如下幾種:
1:AMQ消息存儲-基于文件的存儲方式,是以前的默認消息存儲
2:KahaDB消息存儲-提供了容量的提升和恢復能力,是現在的默認存儲方式
3:JDBC消息存儲-消息基于JDBC存儲的
4:Memory 消息存儲-基于內存的消息存儲
二、持久化
2.1 KahaDB消息存儲
1、KahaDB是目前默認的存儲方式,可用于任何場景,提高了性能和恢復能力。消息存儲使用一個事務日志和僅僅用一個索引文件來存儲它所有的地址。
在:activemq/conf/activemq.xml中可以查看
2、KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到data logs中。當不再需要log文
件中的數據的時候,log文件會被丟棄。
3、KahaDB基本配置例子:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
可用的屬性有:
1:director:KahaDB存放的路徑,默認值activemq-data
2:indexWriteBatchSize: 批量寫入磁盤的索引page數量,默認值1000
3:indexCacheSize:內存中緩存索引page的數量,默認值10000
4:enableIndexWriteAsync:是否異步寫出索引,默認false
5:journalMaxFileLength:設置每個消息data log的大小,默認是32MB
6:enableJournalDiskSyncs:設置是否保證每個沒有事務的內容,被同步寫入磁盤,JMS持久化的時候需要,默認為true
7:cleanupInterval:在檢查到不再使用的消息后,在具體刪除消息前的時間,默認30000
8:checkpointInterval:checkpoint的間隔時間,默認5000
9:ignoreMissingJournalfiles:是否忽略丟失的消息日志文件,默認false
10:checkForCorruptJournalFiles:在啟動的時候,將會驗證消息文件是否損壞,默認false
11:checksumJournalFiles:是否為每個消息日志文件提供checksum,默認false
12:archiveDataLogs: 是否移動文件到特定的路徑,而不是刪除它們,默認false
13:directoryArchive:定義消息已經被消費過后,移動data log到的路徑,默認null
14:databaseLockedWaitDelay:獲得數據庫鎖的等待時間 (used by shared master/slave),默認10000
15:maxAsyncJobs:設置最大的可以存儲的異步消息隊列,默認值10000,可以和concurrentMessageProducers 設置成一樣的值
16:concurrentStoreAndDispatchTransactions:是否分發消息到客戶端,同時事務存儲消息,默認true
17:concurrentStoreAndDispatchTopics:是否分發Topic消息到客戶端,同時進行存儲,默認true
18:concurrentStoreAndDispatchQueues:是否分發queue消息到客戶端,同時進行存儲,默認true
2.2 AMQ消息存儲
1、AMQ Message Store是ActiveMQ5.0缺省的持久化存儲,它是一個基于文件、事務存儲設計為快速消息存儲的一個結構,該結構是以流的形式來
進行消息交互的。
2、這種方式中,Messages被保存到data logs中,同時被reference store進行索引以提高存取速度。Date logs由一些單獨的data log文件組成,缺省的
文件大小是32M,如果某個消息的大小超過了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某個data log文件中所有的消
息都被成功消費了,那么這個data log文件將會被標記,以便在下一輪的清理中被刪除或者歸檔。
3、配置示例
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/> </persistenceAdapter> </broker>
2.3 JDBC消息存儲
ActiveMQ支持使用JDBC來持久化消息
2.3.1 預定義的表
1:消息表,缺省表名為ACTIVEMQ_MSGS,queue和topic都存在里面,結構如下:
2:ACTIVEMQ_ACKS表存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID,結構如下:
3:鎖定表,缺省表名為ACTIVEMQ_LOCK,用來確保在某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫 ,結構如下:
2.3.2 配置activemq/conf/activemq.xml
<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> ... ... </broker> <bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://192.168.91.4:3306/db_activemq?useUnicode=true&characterEncoding=UTF-8" /> <property name="username" value="root" /> <property name="password" value="123456"/> </bean>
注意:此處需要上傳數據庫驅動包到/opt/activemq/lib下,我這里連接池用的是阿里的,所以還要上傳druid包
重啟activemq
測試發送3條隊列消息:
補充:JDBC Message Store with ActiveMQ Journal
這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術,大大提高了性能。 配置將persistenceAdapter注釋掉,再上persistenceFactory,示例如下:
<!-- <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> --> <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data" /> </persistenceFactory>
JDBC Store和JDBC Message Store with ActiveMQ Journal的區別:
1:Jdbc with journal的性能優于jdbc
2:Jdbc用于master/slave模式的數據庫分享
3:Jdbc with journal不能用于master/slave模式
4:一般情況下,推薦使用jdbc with journal
2.4 Memory消息存儲
內存消息存儲主要是存儲所有的持久化的消息在內存中。這里沒有動態的緩存存在,所以你必須注意設置你的broker所在的JVM和內存限制。
配置示例:
<broker ... persistent="false" ...></broker>
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。