更新時間:2022-05-07 09:40:20 來源:動力節點 瀏覽2591次
在JMS教程中講規范的時候說過:消息的可靠性通過三個方面保證——持久化、事務和簽收。這里說一下ActiveMQ中消息持久化的方式。
ActiveMQ中常用的持久化機制有JDBC(將消息持久化到Mysql等數據庫)、AMQ(低版本中的默認方案)、KahaDB(5.4版本以后的默認持久化方案)和LevelDB(在以后的版本中可能會變成默認的持久化方案)。有了持久化機制,消息在發送后會首先持久化到對應的文件或數據表中,在消息被消費后,再從這些文件或表中刪除(對于持久化的主體消息不會刪除)。
有了消息的持久化,在消息服務器啟動時會先從持久化的媒介中讀取之前未消費的消息等信息,并將讀取到的消息發送給消息的訂閱者或者消費者去消費,這樣就不會出現消息的丟失,保證了消息的可靠性。
ActiveMQ中主要有以下幾種持久化的方式:
AMQ Message Stroe:基于文件的存儲方式,具有寫入速度快和容易恢復的特點,消息存儲在一個個文件中,文件的默認大小是32M,是之前的默認持久化方式,現在幾乎不用了
KahaDB:基于日志文件的持久化方式,從Active5.4版本后作為默認的持久化方式,在activemq.xml文件中可以看到如下配置:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
即默認情況下會將消息持久化到activemq安裝目錄下的/data/kahadb目錄下,在該目錄下有4個或5個文件,用來保存持久化的消息數據(db-n.log)和索引(db.data)等:數據會被追加到db-n.log文件中,當不再需要某一個db-n.log文件中的數據的時候,該log文件會被丟棄。
這些文件的作用介紹:
db-<n>.log:KahaDB存儲消息到預定義大小的數據記錄文件中,文件命名為db-.log,當數據文件已滿時,會創建一個新的數據記錄文件,n的值也會隨之遞增,文件名按照數字進行編號,如db-1.log、db-2.log、db-3.log、…當不再有引用道數據文件中的任何消息時,文件會被刪除或歸檔。
db.data:該文件包含了持久化的BTree索引,索引了消息數據記錄中的消息,他是消息的索引文件,本質上是B-Tree,使用B-Tree作為索引到db-n.log中找消息
db.free:當前db.data文件中有哪些頁是空閑的,文件的具體內容是所有空閑頁的ID
db.redo:用來進行消息恢復
lock:文件鎖,表示當前獲得KahaDB讀寫權限的broker
JDBC消息存儲:基于JDBC的消息存儲,可借助于這種方式將消息持久化到Mysql等數據庫中
LevelDB消息存儲:這種文件系統是從ActiveMQ5.8之后引進的,和KahaDB非常相似,也是基于文件的本地數據庫存儲形式,但是它提供了比KahaDB更快的持久性,它不是使用B-Tree索引日志,而是使用基于LevelDB的索引
JDBC + ActiveMQ高速緩存:即JDBC Message store with ActiveMQ Journal,和JDBC的方式很相似,只不過是在持久化到的數據庫和ActiveMQ服務器之間加了一層高速緩存
由于ActiveMQ是基于Java語言開發的,因此我們可以直接在其項目的lib包下加入數據庫驅動的jar包或者數據庫連接池的jar包等。
(1)將Mysql的驅動jar包加入到ActiveMQ的lib目錄下
(2)在activemq.xml文件中配置數據源:activemq.xml類似于Spring的容器配置文件
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.2.143:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
(3)修改消息的持久化方式:由默認的KahaDB改為現在的JDBC的方式
改之前:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
改之后:注意數據源的名稱要一致,createTablesOnStartup是說重啟MQ服務后是否自動創建ActiveMQ相關的表,一般設置為true
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
(4)在連接的數據庫服務器上創建使用的庫
(5)配置好后,重啟MQ服務,會看到數據庫中多了3張表,這些表是ActiveMQ啟動服務時創建的根ActiveMQ相關的表
activemq_acks:該表記錄的是主題訂閱關系(消息簽收者)信息
activemq_msgs:該表記錄的是待消費的消息,該表中的消息一旦被消費則會被刪除(主題類的消息不會被清除)
activemq_lock:在集群環境中才有用,用于記錄哪個Broker是當前的Master Broker
這樣配置好后,一旦有消息產生就會在相應的表中看到記錄:但隊列中的消息一旦被消費就又會被刪除
配置JDBC持久化的高速緩存:僅有上面的配置,則消息生產者每次有消息需要持久化都會通過JDBC去調用數據庫服務以持久化數據,但像隊列中的消息,大部分在消費后又需要清除,進行及其短暫的持久化意義不大,但卻依然使JDBC頻繁的和數據庫進行交互,效率低下。因此可以在ActiveMQ和數據庫服務器之間加一層高速緩存,用來暫時緩存消息,若隊列中的消息長時間沒有被消費時才進行持久化,這樣就會大大減少ActiveMQ和數據庫服務交互的次數,提高效率。舉例來說:生產者生產了1000條消息,這1000條消息會保存到緩存文件journal文件中,如果消費者的消費速度很快,在journal文件還沒有同步到數據庫之前,消費者已經消費了900條,那么這時就只需要將剩余的100條同步即可,如果消費者消費的很慢,journal文件可以批量的將消息同步到數據庫,大大減少了ActiveMQ和數據庫服務器的交互次數。將持久化的方式改為如下方式:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"
/>
</persistenceFactory>
以上就是關于“使ActiveMQ消息持久化的方法”介紹,大家如果對此比較感興趣,想了解更多相關知識,不妨來關注一下動力節點的ActiveMQ教程,里面還有更豐富的知識等著大家去學習,相信對大家一定會有所幫助的。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習