更新時間:2020-09-11 15:17:35 來源:動力節(jié)點(diǎn) 瀏覽1356次
一、什么是消息中間件
消息中間件顧名思義實(shí)現(xiàn)的就是在兩個系統(tǒng)或兩個客戶端之間進(jìn)行消息傳送
二、什么是ActiveMQ
ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實(shí)現(xiàn),ActiveMQ的設(shè)計目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語言和多系統(tǒng)的應(yīng)用集成消息通信中間件。
三、什么時候需要用ActiveMQ
ActiveMQ常被應(yīng)用與系統(tǒng)業(yè)務(wù)的解耦,異步消息的推送,增加系統(tǒng)并發(fā)量,提高用戶體驗(yàn)。例如以我在工作中的使用,在比較耗時且異步的遠(yuǎn)程開鎖操作時
四、如何使用ActiveMQ
1.AcitveMQ的數(shù)據(jù)傳送流程
2.ActiveMQ的兩種消息傳遞類型
(1)點(diǎn)對點(diǎn)傳輸,即一個生產(chǎn)者對應(yīng)一個消費(fèi)者,生產(chǎn)者向broke推送數(shù)據(jù),數(shù)據(jù)存儲在broke的一個隊(duì)列中,當(dāng)消費(fèi)者接受該條隊(duì)列里的數(shù)據(jù)。
(2)基于發(fā)布/訂閱模式的傳輸,即根據(jù)訂閱話題來接收相應(yīng)數(shù)據(jù),一個生產(chǎn)者可向多個消費(fèi)者推送數(shù)據(jù),與MQTT協(xié)議的實(shí)現(xiàn)是類似的,對MQTT協(xié)議有興趣的可跳轉(zhuǎn)到https://www.cnb·ogs.com/xiguadadage/p/11216463.htm·
兩種消息傳遞類型的不同,點(diǎn)對點(diǎn)傳輸消費(fèi)者可以接收到在連接之前生產(chǎn)者所推送的數(shù)據(jù),而基于發(fā)布/訂閱模式的傳輸方式消費(fèi)者只能接收到連接之后生產(chǎn)者推送的數(shù)據(jù)。
3.ActiveMQ的安裝與啟動
(1)官網(wǎng)下載對應(yīng)服務(wù)器版本
(2)解壓后進(jìn)入apache-activemq-5.15.9/bin目錄
(3)執(zhí)行./activemq start啟動ActiveMQ
(4)瀏覽器輸入ActiveMQ啟動的服務(wù)器ip:8161便可進(jìn)入web界面,點(diǎn)擊Manage ActiveMQ broker可以查看消息推送的狀態(tài),默認(rèn)賬號密碼為admin,admin
(5)啟動錯誤分析
進(jìn)入/root/apache-activemq-5.15.9/data目錄查看activemq.·og文件,根據(jù)錯誤提示信息修改,例如端口號被占用等。
4.ActiveMQ的代碼測試
(1)構(gòu)建maven項(xiàng)目,引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
(2)生產(chǎn)者類
/**
* @Description 生產(chǎn)者 * @Date 2019/7/20 * @Created by yqh */public class MyProducer { private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616"; public static void main(String[] args) throws JMSException { // 創(chuàng)建連接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 創(chuàng)建連接 Connection connection = activeMQConnectionFactory.createConnection(); // 打開連接 connection.start(); // 創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù) Destination destination = session.createQueue("myQueue"); // 創(chuàng)建一個生產(chǎn)者 MessageProducer producer = session.createProducer(destination); // 向隊(duì)列推送10個文本消息數(shù)據(jù) for (int i = 1 ; i <= 10 ; i++){ // 創(chuàng)建文本消息 TextMessage message = session.createTextMessage("第" + i + "個文本消息"); //發(fā)送消息 producer.send(message); //在本地打印消息 System.out.println("已發(fā)送的消息:" + message.getText()); } //關(guān)閉連接 connection.close(); }}
運(yùn)行結(jié)果:
已發(fā)送的消息:第1個文本消息
已發(fā)送的消息:第2個文本消息已發(fā)送的消息:第3個文本消息已發(fā)送的消息:第4個文本消息已發(fā)送的消息:第5個文本消息已發(fā)送的消息:第6個文本消息已發(fā)送的消息:第7個文本消息已發(fā)送的消息:第8個文本消息已發(fā)送的消息:第9個文本消息已發(fā)送的消息:第10個文本消息
測試查看web后臺顯示,有10條消息在隊(duì)列中等待消費(fèi)
(3)消費(fèi)者類
/**
* @Description 消費(fèi)者類 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumer { private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616"; public static void main(String[] args) throws JMSException { // 創(chuàng)建連接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 創(chuàng)建連接 Connection connection = activeMQConnectionFactory.createConnection(); // 打開連接 connection.start(); // 創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù) Destination destination = session.createQueue("myQueue"); // 創(chuàng)建消費(fèi)者 MessageConsumer consumer = session.createConsumer(destination); // 創(chuàng)建消費(fèi)的監(jiān)聽 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消費(fèi)的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }}
測試結(jié)果:
消費(fèi)的消息:第1個文本消息
消費(fèi)的消息:第2個文本消息消費(fèi)的消息:第3個文本消息消費(fèi)的消息:第4個文本消息消費(fèi)的消息:第5個文本消息消費(fèi)的消息:第6個文本消息消費(fèi)的消息:第7個文本消息消費(fèi)的消息:第8個文本消息消費(fèi)的消息:第9個文本消息消費(fèi)的消息:第10個文本消息
web后臺顯示有一個消費(fèi)者處于連接狀態(tài),且已消費(fèi)了10個message,而該條隊(duì)列已沒有message待消費(fèi)了
(4)當(dāng)我們運(yùn)行兩個消費(fèi)者類,消息又是怎么被消費(fèi)的呢?是兩個消費(fèi)者都能收到生產(chǎn)者生產(chǎn)的message,還是只有其中一個消費(fèi)者能消費(fèi)呢?
我們先運(yùn)行兩個消費(fèi)者,在運(yùn)行一個生產(chǎn)者對目標(biāo)隊(duì)列生產(chǎn)10個message,會發(fā)現(xiàn)有以下情況
// Consumer1控制臺
消費(fèi)的消息:第1個文本消息消費(fèi)的消息:第3個文本消息消費(fèi)的消息:第5個文本消息消費(fèi)的消息:第7個文本消息消費(fèi)的消息:第9個文本消息
// Consumer2控制臺
消費(fèi)的消息:第2個文本消息消費(fèi)的消息:第4個文本消息消費(fèi)的消息:第6個文本消息消費(fèi)的消息:第8個文本消息消費(fèi)的消息:第10個文本消息
即隊(duì)列中的數(shù)據(jù)會平均的分給每一個消費(fèi)者消費(fèi),且每一條數(shù)據(jù)只能被消費(fèi)一次
(5)以上是基于隊(duì)列點(diǎn)對點(diǎn)的傳輸類型,以下是基于發(fā)布/訂閱模式傳輸?shù)念愋蜏y試
/**
* @Description 基于發(fā)布/訂閱模式傳輸類型的生產(chǎn)者測試 * @Date 2019/7/20 0020 * @Created by yqh */public class MyProducerForTopic { private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616"; public static void main(String[] args) throws JMSException { // 創(chuàng)建連接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 創(chuàng)建連接 Connection connection = activeMQConnectionFactory.createConnection(); // 打開連接 connection.start(); // 創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù) Destination destination = session.createTopic("topicTest"); // 創(chuàng)建一個生產(chǎn)者 MessageProducer producer = session.createProducer(destination); // 向隊(duì)列推送10個文本消息數(shù)據(jù) for (int i = 1 ; i <= 10 ; i++){ // 創(chuàng)建文本消息 TextMessage message = session.createTextMessage("第" + i + "個文本消息"); //發(fā)送消息 producer.send(message); //在本地打印消息 System.out.println("已發(fā)送的消息:" + message.getText()); } //關(guān)閉連接 connection.close(); }}
/**
* @Description 基于發(fā)布/訂閱模式傳輸類型的消費(fèi)者測試 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumerForTopic { private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616"; public static void main(String[] args) throws JMSException { // 創(chuàng)建連接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 創(chuàng)建連接 Connection connection = activeMQConnectionFactory.createConnection(); // 打開連接 connection.start(); // 創(chuàng)建會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù) Destination destination = session.createTopic("topicTest"); // 創(chuàng)建消費(fèi)者 MessageConsumer consumer = session.createConsumer(destination); // 創(chuàng)建消費(fèi)的監(jiān)聽 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消費(fèi)的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }}
現(xiàn)在如果我們先啟動生產(chǎn)者,再啟動消費(fèi)者,會發(fā)現(xiàn)消費(fèi)者是無法接收到之前生產(chǎn)者之前所生產(chǎn)的數(shù)據(jù),只有消費(fèi)者先啟動,再讓生產(chǎn)者消費(fèi)才可以正常接收數(shù)據(jù),這也是發(fā)布/訂閱的主題模式與點(diǎn)對點(diǎn)的隊(duì)列模式的一個明顯區(qū)別。
而如果啟動兩個消費(fèi)者,那么每一個消費(fèi)者都能完整的接收到生產(chǎn)者生產(chǎn)的數(shù)據(jù),即每一條數(shù)據(jù)都被消費(fèi)了兩次,這是發(fā)布/訂閱的主題模式與點(diǎn)對點(diǎn)的隊(duì)列模式的另一個明顯區(qū)別。
以上就是動力節(jié)點(diǎn)java培訓(xùn)機(jī)構(gòu)的小編針對“activemq視頻講解之使用說明”的內(nèi)容進(jìn)行的回答,希望對大家有所幫助,如有疑問,請?jiān)诰€咨詢,有專業(yè)老師隨時為你服務(wù)。
初級 202925
初級 203221
初級 202629
初級 203743