更新時(shí)間:2022-05-10 10:18:20 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽3607次
當(dāng)發(fā)生以下情況之一時(shí),來(lái)自消息隊(duì)列的可能是“死信”:
消息被拒絕并且重新排隊(duì)設(shè)置為 false
消息的 TTL 過(guò)期
超出隊(duì)列長(zhǎng)度限制
為了通過(guò)示例進(jìn)行演示,我選擇了第一種情況,即消息被拒絕。生產(chǎn)者將PaymentOrders作為消息發(fā)送,這些消息將由消費(fèi)者處理。當(dāng)PaymentOrder付款人賬戶(hù)資金不足時(shí),消息將被拒絕。
生產(chǎn)者是一個(gè) Spring Boot 應(yīng)用程序,它使用Spring AMQP庫(kù)向PaymentOrderRabbitMQ 發(fā)送消息。
生產(chǎn)者的 API
生產(chǎn)者 API 的第一部分是定義交換器的名稱(chēng)、路由密鑰、傳入和死信隊(duì)列。
public class Constants {
public static final String EXCHANGE_NAME = "payment-orders.exchange";
public static final String ROUTING_KEY_NAME = "payment-orders";
public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}
第二部分是定義消息格式。我們?cè)诖耸纠惺褂?JSON。以下 JSON 文檔顯示了我們?nèi)绾谓aymentOrder
{
"from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
"to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
"amount":54.75
}
請(qǐng)注意,最好不要使用自定義序列化格式(如有效負(fù)載的 Java 序列化),因?yàn)檫@意味著您需要有一個(gè)基于 Java 的使用者。好的做法是將有效負(fù)載格式化為 JSON。每個(gè)平臺(tái)和/或語(yǔ)言都可以解析 JSON。
生產(chǎn)者配置
我們需要配置 AMQP 基礎(chǔ)設(shè)施。死信隊(duì)列配置封裝在傳入隊(duì)列聲明中。
有一個(gè)死信交換direct(DLX) 的概念,它是類(lèi)型topic或的正常交換fanout。如果在處理從隊(duì)列中獲取的消息期間發(fā)生故障,RabbitMQ 會(huì)檢查是否為該隊(duì)列配置了死信交換。如果通過(guò)x-dead-letter-exchange參數(shù)配置了一個(gè),那么它將使用原始路由密鑰將失敗的消息路由到它??梢酝ㄟ^(guò)x-dead-letter-routing-key參數(shù)覆蓋此路由鍵。
在此示例中,我們使用default exchange(no-name) 作為 the dead letter exchange,并使用死信隊(duì)列名稱(chēng)作為新的路由鍵。這將起作用,因?yàn)槿魏侮?duì)列都綁定到默認(rèn)交換,綁定鍵等于隊(duì)列名稱(chēng)。
@Configuration
public class AmqpConfig {
@Bean
DirectExchange exchange() {
return new DirectExchange(Constants.EXCHANGE_NAME);
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
.build();
}
@Bean
Binding binding() {
return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
用于隊(duì)列和交換的構(gòu)建器 API 非常方便,并且從 Spring AMQP 庫(kù)的 1.6 版本開(kāi)始可用。
在 RabbitMQ 管理控制臺(tái)中,DLX和DLK標(biāo)簽指示在傳入隊(duì)列上設(shè)置了dead letter exchange和dead letter routing key參數(shù)。
生產(chǎn)者邏輯
生產(chǎn)者每 5 秒生成一次隨機(jī)PaymentOrder消息,這些消息被發(fā)送到 RabbitMQ 進(jìn)行進(jìn)一步處理。SpringAmqpTemplate是自動(dòng)配置的,它可以連接到我們的組件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定義了它將自動(dòng)關(guān)聯(lián)到 auto-configured AmqpTemplate。
@Component
public class Producer {
private AmqpTemplate amqpTemplate;
public Producer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
@Scheduled(fixedDelay = 1000L)
public void send() {
PaymentOrder paymentOrder = new PaymentOrder(
Iban.random().toFormattedString(),
Iban.random().toFormattedString(),
new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
}
}
對(duì)于這個(gè)簡(jiǎn)單的示例,消費(fèi)者也是一個(gè) Spring Boot 應(yīng)用程序,但在實(shí)際應(yīng)用程序中,消費(fèi)者和生產(chǎn)者不必在同一平臺(tái)/語(yǔ)言上。
消費(fèi)者 API
消費(fèi)者 API 的第一部分是指定它連接到哪個(gè)隊(duì)列。
public class Constants {
public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}
第二部分是適應(yīng)生產(chǎn)者定義的消息格式。請(qǐng)注意,在這種情況下,兩個(gè)應(yīng)用程序都是基于 Java 的,因此我可以創(chuàng)建一個(gè)包含PaymentOrder類(lèi)文件的 jar 文件并與消費(fèi)者和生產(chǎn)者共享它。然而,這是不好的做法,因?yàn)樗肓嘶诠蚕韼?kù)的緊密耦合。更好的方法是使用一些代碼重復(fù)(PaymentOrder在這種情況下為類(lèi))并通過(guò)同意消息格式來(lái)使用更松散的耦合方法。
public class PaymentOrder {
String from;
String to;
BigDecimal amount;
@JsonCreator
public PaymentOrder(@JsonProperty("from") String from,
@JsonProperty("to") String to,
@JsonProperty("amount") BigDecimal amount) {
this.from = from;
this.to = to;
this.amount = amount;
}
// getters and toString()
}
消費(fèi)者配置
消費(fèi)者只關(guān)心從中獲取消息的隊(duì)列。傳入隊(duì)列必須存在,否則消費(fèi)者將無(wú)法啟動(dòng)。請(qǐng)注意,dead letter queue消費(fèi)者啟動(dòng)時(shí)不必存在 ,但在消息需要“死信”時(shí)它應(yīng)該存在。如果它丟失,則消息將被靜默丟棄。
@Configuration
public class AmqpConfig {
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
.build();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
默認(rèn)情況下啟用重新排隊(duì)。為了“死信”消息,您需要將以下屬性設(shè)置為 false。
spring:
rabbitmq:
listener:
default-requeue-rejected: false
但是,如果您想在某些錯(cuò)誤情況下啟用重新排隊(duì),最好保持啟用重新排隊(duì)并利用AmqpRejectAndDontRequeueException將發(fā)送basic.reject帶有 requeue=false 的選項(xiàng)。
消費(fèi)邏輯
每當(dāng)傳入隊(duì)列上有消息可用時(shí),將使用反序列化的實(shí)例process調(diào)用該方法。在這里,我們通過(guò)拋出一個(gè)擴(kuò)展異常PaymentOrder來(lái)模擬消息拒絕。InsufficientFundsExceptionAmqpRejectAndDontRequeueException
@Component
public class Consumer {
@RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
if (new Random().nextBoolean()) {
throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
}
}
}
下圖顯示了一條消息的示例,該P(yáng)aymentOrder消息被拒絕并最終進(jìn)入dead letter queue
有時(shí)它有助于自動(dòng)重試失敗的操作,以防它可能在后續(xù)嘗試中成功。RetryTemplateSpring AMQP 庫(kù)在Spring Retry項(xiàng)目(從 Spring Batch 中提取)的幫助下提供了對(duì)此的支持。Spring Boot 使配置變得非常容易,RetryTemplate如下面的示例所示。
spring:
rabbitmq:
listener:
retry:
enabled: true
initial-interval: 2000
max-attempts: 2
multiplier: 1.5
max-interval: 5000
使用上述配置,重試功能已啟用(默認(rèn)情況下禁用),最多應(yīng)有 2 次嘗試傳遞消息,第一次和第二次嘗試之間應(yīng)為 2 秒,稍后與上一次重試間隔乘以 1.5 和最多 5 秒。運(yùn)行您將在日志中看到的消費(fèi)者
2016-09-07 21:56:53.396 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401 WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])
如您所見(jiàn),使用 RabbitMQ 配置死信隊(duì)列非常簡(jiǎn)單。如果大家想了解更多相關(guān)知識(shí),不妨來(lái)關(guān)注一下動(dòng)力節(jié)點(diǎn)的RabbitMQ教程,里面有更豐富的知識(shí)等著大家去學(xué)習(xí),希望對(duì)大家能夠有所幫助。
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問(wèn)老師會(huì)電話(huà)與您溝通安排學(xué)習(xí)