更新時間:2021-01-25 16:16:59 來源:動力節點 瀏覽1464次
RabbitMQ是使用Erlang語言開發的消息中間件,其遵循了高級消息隊列協議(Advanced Message Queuing Protocol,AMQP)。
與Kafka等消息隊列相比,RabbitMQ最大的優勢在于其較高的可靠性:
提供確認(ACK)和重傳機制保證消息完成消費,消費者異常不會導致消息丟失
提供消息持久化機制,broker崩潰不會導致消息丟失
集群模式下工作,保證高可用
因為具有較高可靠性和一致性,RabbitMQ可以勝任訂單處理、秒殺等一致性要求較高的業務場景。
RabbitMQ概念與機制
RabbitMQ中的概念模型:
Broker:消息中間件實例,可能是單個節點也可能是運行在多節點集群上的邏輯實體
消息(Message):消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標準消息頭以及其它自定義消息頭,用于定義RabbitMQ對消息行為。消息體是字節流,包含消息內容。
連接(Connection):客戶端與Broker之間的TCP連接
信道(Channel):Channel是建立在TCP連接上的邏輯(虛擬)連接。多個Channel復用同一個TCP連接,以避免建立TCP連接的巨大開銷。RabbitMQ官方要求每個線程使用獨立的Channel,禁止多個線程共用Channel。
生產者(Publisher):發送消息的客戶端線程
消費者(Consumer):處理消息的客戶端線程
交換機(Exchange):交換機負責將消息投遞到相應的隊列
隊列(Queue):接收并保存交換機投遞的消息,直至被消費者成功消費。邏輯結構遵循先進先出FIFO。
綁定(Binding):將隊列(Queue)注冊到交換機(Exchange)的路由表
虛擬主機(Vhost):每個Broker下可建立多個vhost,每個vhost可建立獨立的Exchange、Queue、綁定及權限系統。同一個Broker下的vhost共享Connection、Channel和用戶系統,就是說可以使用同一個用戶身份使用同一個Channel訪問不同vhost。
交換機(Exchange)
生產者發送的消息會首先送到交換機(Exchange),交換機根據自身類型和消息的routing-key等信息將消息投遞到綁定的消息隊列中。
RabbitMQ中的四種標準交換機:
direct:如果消息的routing-key與隊列的binding-key完全相同,direct類型的交換機則會將消息投遞到該隊列中。
多個隊列可以使用相同的binding-key綁定到同一個direct交換機,direct交換機會把消息投遞到所有binding-key與消息routing-key相同的隊列
topic:允許隊列的binding-key中包含通配符*和#,topic交換機會將消息投遞到binding-key與routing-key匹配的隊列中。
通配符按照關鍵字進行匹配,如news.cn.a中的關鍵字是news、cn和a,即關鍵字按照.分割
#通配符匹配0個或多個關鍵字,news.#.a可以匹配news.a,news.cn.a和news.asia.cn.a等
*通配符匹配一個關鍵字,news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a
fanout:fanout交換機不進行任何匹配,將消息投遞到所有綁定的隊列
header:header交換機根據消息頭進行投遞,現在已較少使用
我們可以使用RabbitMQ的插件機制使用第三方交換機或自行開發交換機。如實現延時投遞的delayed-message-exchange。
消息頭中的delivery-mode可以設置為persistent(持久化)或者transient(易失)。Exchange和Queue在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理,即使RabbitMQ崩潰也不會丟失。
消費者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息,即當有新消息時Broker通過channel主動向客戶端發送消息。客戶端也可以使用channel.basicGet從Broker拉取消息。
ACK機制
RabbitMQ提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。
確認送達的回執有三種:
ACK:消息已被成功處理
NACK:消息處理異常,需要重新投遞
REJECT:消息非法,丟棄消息
RabbitMQ的Queue可以設置no_ack=true,則消息被投遞后即刪除不等待回執。
channel.basicConsume可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發出ACK回執,否則必須顯式的發出回執。
Java代碼示例
首先安裝并啟動RabbitMQ實例,Mac用戶可以使用Homebrew進行安裝:
brew install rabbitmq
啟動服務:
brew services start rabbitmq
或者使用官方docker鏡像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ官網提供了Ubuntu、RPM以及Windows等多種平臺安裝方式。
RabbitMQ默認TCP端口為5672,Web控制臺默認端口15672。
在Maven中添加依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
編寫生產者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author finley
*/
public class RabbitProducer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hello";
byte[] msg = "hello world".getBytes();
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
propsBuilder.deliveryMode(2); // persistent
propsBuilder.priority(0); // normal
propsBuilder.contentType("text/plain");
channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
}
}
}
編寫消費者:
package rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* @author finley
*/
public class RabbitConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String exchangeName = "test-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
String bindingKey = "hello";
channel.queueBind(queueName, exchangeName, bindingKey);
while(true) {
channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String bodyStr = new String(body, "UTF-8");
System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
}
}
}
}
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發,RabbitMQ是AMQP(高級消息隊列協議)的標準實現。采用該技術,我們可以實現異步處理、流量削峰、系統解耦;動力節點RabbitMQ視頻教程,課程將講授RabbitMQ的環境搭建、消息的發送與接收、消息確認、與SpringBoot集成等,讓大家快速掌握RabbitMQ技術,以適應項目開發的需要;
以上就是動力節點Java培訓機構的小編針對“Rabbitmq消息中間件視頻,入門學習”的內容進行的回答,希望對大家有所幫助,如有疑問,請在線咨詢,有專業老師隨時為你服務。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習