国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

安卓網(wǎng)站開發(fā)平臺東莞百度seo電話

安卓網(wǎng)站開發(fā)平臺,東莞百度seo電話,品牌建設(shè) 政府做什么,網(wǎng)站建設(shè)銷售ppt模板目錄 一、簡介1.1 背景1.2 定義1.3 如何查看確認(rèn)/未確認(rèn)的消息數(shù)? 二、消息確認(rèn)機(jī)制的分類2.1 消息發(fā)送確認(rèn)1)ConfirmCallback方法2)ReturnCallback方法3)代碼實(shí)現(xiàn)方式一:統(tǒng)一配置a.配置類a.生產(chǎn)者c.消費(fèi)者d.測試結(jié)果 …

目錄

    • 一、簡介
      • 1.1 背景
      • 1.2 定義
      • 1.3 如何查看確認(rèn)/未確認(rèn)的消息數(shù)?
    • 二、消息確認(rèn)機(jī)制的分類
      • 2.1 消息發(fā)送確認(rèn)
        • 1)ConfirmCallback方法
        • 2)ReturnCallback方法
        • 3)代碼實(shí)現(xiàn)方式一:統(tǒng)一配置
          • a.配置類
          • a.生產(chǎn)者
          • c.消費(fèi)者
          • d.測試結(jié)果
        • 4)代碼實(shí)現(xiàn)方式二:單獨(dú)配置
      • 2.2 消息接收確認(rèn)
        • 1)basicAck() 方法
        • 2)basicReject() 方法
        • 3)basicNack() 方法
        • 4)代碼實(shí)現(xiàn)
          • a.配置方式一:代碼配置
          • b.配置方式二:配置文件
          • c.生產(chǎn)者
          • d.消費(fèi)者
          • e.測試結(jié)果

在這里插入圖片描述

當(dāng)我們在項(xiàng)目中引入了新的中間件之后,數(shù)據(jù)的風(fēng)險性就要多一層考慮。那么,RabbitMQ 的消息是怎么知道有沒有被消費(fèi)者消費(fèi)的呢?生產(chǎn)者又怎么確保自己發(fā)送成功了呢?這些問題將在文章中進(jìn)行解答。

一、簡介

1.1 背景

在 MQ 中,消費(fèi)者和生產(chǎn)者并不直接進(jìn)行通信,生產(chǎn)者只負(fù)責(zé)把消息發(fā)送到隊(duì)列,消費(fèi)者只負(fù)責(zé)從隊(duì)列獲取消息。

  • 消費(fèi)者從隊(duì)列 獲取到消息后,這條消息就不在隊(duì)列中了。如果此時消費(fèi)者所在的信道 因?yàn)榫W(wǎng)絡(luò)中斷沒有消費(fèi)到,那這條消息就 被永遠(yuǎn)地丟失了。所以,我們希望等待消費(fèi)者 成功消費(fèi)掉這條消息之后再刪除消息。
  • 生產(chǎn)者向交換機(jī) 發(fā)送消息后,也 不能保證消息準(zhǔn)確發(fā)送過去了,消息就像 石沉大海 一樣,所以 發(fā)送消息也需要進(jìn)行消息確認(rèn)。

1.2 定義

為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ 提供了 消息確認(rèn)機(jī)制(Message Acknowledgement)

消費(fèi)者在訂閱隊(duì)列時,可以指定 autoAck 參數(shù):

  • autoAck=false:RabbitMQ 會 等待消費(fèi)者顯式地回復(fù)確認(rèn)信號 后才從內(nèi)存(或磁盤)中移除消息(實(shí)際上時先打上刪除標(biāo)記,之后再刪除)。
  • autoAck=true:RabbitMQ 會 自動把發(fā)送出去的消息置為確認(rèn),然后內(nèi)存(或磁盤)中刪除,而 不管消費(fèi)者是否真正地消費(fèi)到了這些消息。

采用消息確認(rèn)機(jī)制后,只要設(shè)置 autoAck 參數(shù)為 false消費(fèi)者就有足夠的時間處理消息(任務(wù)),不用擔(dān)心處理消息過程中消費(fèi)者進(jìn)程掛掉后消息丟失的問題,因?yàn)?RabbitMQ 會一直等待持有消息知道消費(fèi)者顯式調(diào)用 Basic.Ack 命令為止。

對于 RabbitMQ 服務(wù)器端而言,當(dāng) autoAck 參數(shù)為 false 時,隊(duì)列中的消息分成了兩部分:

  • 一部分是 等待投遞給消費(fèi)者的消息
  • 另一部分是 已經(jīng)投遞給消費(fèi)者,但是還沒有收到消費(fèi)者確認(rèn)信號的消息。

在這里插入圖片描述

如果 RabbitMQ 服務(wù)器端 一直沒有收到消費(fèi)者的確認(rèn)信息,并且 消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接,則服務(wù)器端會安排 該消息重新進(jìn)入隊(duì)列,等待投遞給下一個消費(fèi)者(也可能還是原來的那個消費(fèi)者)。

RabbitMQ 不會為未確認(rèn)的消息設(shè)置過期時間,它 判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是該消息連接是否已經(jīng)斷開,這個設(shè)計(jì)的原因是 RabbitMQ 允許消費(fèi)者消費(fèi)一條消息的時間可以很久很久。

1.3 如何查看確認(rèn)/未確認(rèn)的消息數(shù)?

RabbitMQ 的 Web 管理平臺上可以看到當(dāng)前隊(duì)列中的 “Ready” 狀態(tài)和 “Unacknowledged” 狀態(tài)的消息數(shù):

  • Read 狀態(tài): 等待投遞給消費(fèi)者的消息數(shù)。
  • Unacknowledged 狀態(tài): 已經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號的消息樹。

在這里插入圖片描述


二、消息確認(rèn)機(jī)制的分類

RabbitMQ 消息確認(rèn)機(jī)制分為兩大類:

  1. 消息發(fā)送確認(rèn),又分為:
    • 生產(chǎn)者到交換機(jī)的確認(rèn)
    • 交換機(jī)到隊(duì)列的確認(rèn)
  2. 消息接收確認(rèn)。

2.1 消息發(fā)送確認(rèn)

RabbitMQ 的消息發(fā)送確認(rèn)有兩種實(shí)現(xiàn)方式:ConfirmCallback 方法、ReturnCallback 方法。

1)ConfirmCallback方法

ConfirmCallback 是一個回調(diào)接口,用于確認(rèn)消息否是到達(dá)交換機(jī)中。

配置方式:

spring.rabbitmq.publisher-confirm-type=correlated

它有三個值:

  • none:禁用發(fā)布確認(rèn)模式,默認(rèn)值。
  • correlated:發(fā)布消息成功到交換機(jī)后觸發(fā)回調(diào)方法。
  • simple:經(jīng)測試有兩種效果:一是和 correlated 一樣會觸發(fā)回調(diào)方法;二是在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirm 或 waitForConfirmsOrDie方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯。要注意的是 waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel,則接下來無法發(fā)送消息到 broker。
2)ReturnCallback方法

ReturnCallback 也是一個回調(diào)接口,用于確認(rèn)消息是否在交換機(jī)中路由到了隊(duì)列。

(該方法可以不使用,因?yàn)榻粨Q機(jī)和隊(duì)列是在代碼里面綁定的,如果消息成功投遞到 Broker 后幾乎不存在綁定隊(duì)列失敗,除非代碼寫錯了。)

配置方式:

spring.rabbitmq.publisher-returns=true
3)代碼實(shí)現(xiàn)方式一:統(tǒng)一配置
a.配置類

RabbitDirectConfig.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** <p> @Title RabbitDirectConfig* <p> @Description 直連交換機(jī)配置* Direct Exchange是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式,根據(jù)key全文匹配去尋找隊(duì)列。** @author ACGkaka* @date 2023/1/12 15:09*/
@Slf4j
@Configuration
public class RabbitDirectConfig {public static final String DIRECT_EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE";public static final String DIRECT_ROUTING_NAME = "TEST_DIRECT_ROUTING";public static final String DIRECT_QUEUE_NAME = "TEST_DIRECT_QUEUE";@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強(qiáng)制調(diào)用回調(diào)函數(shù)rabbitTemplate.setMandatory(true);//設(shè)置message序列化方法rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 設(shè)置消息發(fā)送到交換機(jī)(Exchange)回調(diào)rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info(">>>>>>>>>>【INFO】消息發(fā)送到交換機(jī)(Exchange)成功, 相關(guān)數(shù)據(jù): {}", correlationData);} else {log.error(">>>>>>>>>>【ERROR】消息發(fā)送到交換機(jī)(Exchange)失敗, 錯誤原因: {}, 相關(guān)數(shù)據(jù): {}", cause, correlationData);}});// 設(shè)置消息發(fā)送到隊(duì)列(Queue)回調(diào)(經(jīng)測試,只有失敗才會調(diào)用)rabbitTemplate.setReturnsCallback((returnedMessage) -> {log.error(">>>>>>>>>>【ERROR】消息發(fā)送到隊(duì)列(Queue)失敗:響應(yīng)碼: {}, 響應(yīng)信息: {}, 交換機(jī): {}, 路由鍵: {}, 消息內(nèi)容: {}",returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());});return rabbitTemplate;}/*** 消息監(jiān)聽-反序列化*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/*** 隊(duì)列,命名:testDirectQueue** @return 隊(duì)列*/@Beanpublic Queue testDirectQueue() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時仍然存在,暫存隊(duì)列:當(dāng)前連接有效// exclusive:默認(rèn)false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable。// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。// 一般設(shè)置一下隊(duì)列的持久化就好,其余兩個默認(rèn)falsereturn new Queue(DIRECT_QUEUE_NAME, true);}/*** Direct交換機(jī),命名:testDirectExchange* @return Direct交換機(jī)*/@BeanDirectExchange testDirectExchange() {return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);}/*** 綁定 將隊(duì)列和交換機(jī)綁定,并設(shè)置用于匹配鍵:testDirectRouting* @return 綁定*/@BeanBinding bindingDirect() {return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING_NAME);}
}
a.生產(chǎn)者

SendMessageController.java

import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** <p> @Title SendMessageController* <p> @Description 推送消息接口** @author ACGkaka* @date 2023/1/12 15:23*/
@Slf4j
@RestController
public class SendMessageController {/*** 使用 RabbitTemplate,這提供了接收/發(fā)送等方法。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機(jī):TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return "OK";}}
c.消費(fèi)者

DirectReceiver.java

import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** <p> @Title DirectReceiver* <p> @Description 直連交換機(jī)監(jiān)聽類** @author ACGkaka* @date 2023/1/12 15:59*/
@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(Map<String, Object> testMessage) {System.out.println("DirectReceiver消費(fèi)者收到消息:" + testMessage.toString());}}
d.測試結(jié)果

成功發(fā)送時,執(zhí)行結(jié)果:

在這里插入圖片描述

交換機(jī)錯誤時,執(zhí)行結(jié)果:

在這里插入圖片描述

路由鍵錯誤時,執(zhí)行結(jié)果:

在這里插入圖片描述

4)代碼實(shí)現(xiàn)方式二:單獨(dú)配置

除了在配置類里面統(tǒng)一設(shè)置回調(diào)方法外,還可以在每次推送消息到隊(duì)列時,手動使用 CorrelationData 指定回調(diào)方法。

@GetMapping("/sendDirectMessage2")
public String sendDirectMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);//生成唯一標(biāo)識CorrelationData correlationData = new CorrelationData(messageId);//不管成功失敗都會調(diào)用confirm或者throwable,這是異步調(diào)用correlationData.getFuture().addCallback(confirm -> {// 設(shè)置消息發(fā)送到交換機(jī)(Exchange)回調(diào)if (confirm != null && confirm.isAck()) {log.info(">>>>>>>>>>【INFO】發(fā)送成功ACK,msgId: {}, message: {}", correlationData.getId(), map);} else {log.error(">>>>>>>>>>【ERROR】發(fā)送失敗NACK,msgId: {}, message: {}", correlationData.getId(), map);}},throwable -> {//發(fā)生錯誤,鏈接mq異常,mq未打開等...報錯回調(diào)System.out.println("發(fā)送失敗throwable = " + throwable + ",  id:" + correlationData.getId());});// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機(jī):TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map, correlationData);return "OK";
}

2.2 消息接收確認(rèn)

消費(fèi)者確認(rèn)發(fā)生在 監(jiān)聽隊(duì)列的消費(fèi)者處理業(yè)務(wù)失敗,如:發(fā)生了異常、不符合要求的數(shù)據(jù)等。這些場景就 需要我們手動處理消息,比如:重新發(fā)送消息或者丟棄消息。

RabbitMQ 的 消息確認(rèn)機(jī)制(ACK) 默認(rèn)是自動確認(rèn)的。自動確認(rèn)會 在消息發(fā)送給消費(fèi)者后立即確認(rèn),但 存在丟失消息的可能。如果消費(fèi)端消費(fèi)邏輯拋出了異常,假如我們使用了事務(wù)的回滾,也只是保證了數(shù)據(jù)的一致性,消息還是丟失了。也就是消費(fèi)端沒有處理成功這條消息,那么就相當(dāng)于丟失了消息。

消息的確認(rèn)模式有三種:

  1. AcknowledgeMode.NONE:自動確認(rèn)。(默認(rèn))
  2. AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)。
  3. AcknowledgeMode.MANUAL:手動確認(rèn)。(推薦)

消費(fèi)者收到消息后,手動調(diào)用 Channel 的 basicAck()/basicReject()/basicNack() 方法后,RabbitMQ 收到消息后,才認(rèn)為本次投遞完成。

  1. basicAck():用于確認(rèn)當(dāng)前消息。
  2. basicReject():用于拒絕當(dāng)前消息,可以自定義是否重回隊(duì)列。
  3. basicNack():用于批量拒絕消息(這是 AMPQ 0-9-1 的 RabbitMQ 擴(kuò)展)。
1)basicAck() 方法

basicAck() 方法 用于確認(rèn)當(dāng)前消息Channel 類中的方法定義如下:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

參數(shù)說明:

  • long deliveryTag: 當(dāng)一個消費(fèi)者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個方法攜帶了一個 deliveryTag,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識ID,是一個單調(diào)遞增的正整數(shù),deliveryTag 的范圍僅限于當(dāng)前 Channel。
  • boolean multiple: 是否批處理,一般為 false,當(dāng)該參數(shù)為 true 時,則可以一次性確認(rèn) deliveryTag 小于等于傳入值的所有消息。
2)basicReject() 方法

basicReject() 方法 用于明確拒絕當(dāng)前的消息。RabbitMQ 在 2.0.0 版本開始引入,Channel 類中的方法定義如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

參數(shù)說明:

  • long deliveryTag: 當(dāng)一個消費(fèi)者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個方法攜帶了一個 deliveryTag,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識ID,是一個單調(diào)遞增的正整數(shù),deliveryTag 的范圍僅限于當(dāng)前 Channel。
  • boolean requeue: 是否重新放回隊(duì)列。
    • 如果參數(shù)為 true,則 RabbitMQ 會重新將這條消息存入隊(duì)列,以便發(fā)送給下一個訂閱的消費(fèi)者。
    • 如果參數(shù)為 false,則 RabbitMQ 會立即把消息從隊(duì)列中移除,不會把它發(fā)送給新的消費(fèi)者。
3)basicNack() 方法

basicNack() 方法 用于批量拒絕消息。由于 basicReject() 方法一次只能拒絕一條消息,如果想批量拒絕消息,則可以使用 basicNack() 方法。Channel 類中的方法定義如下:

參數(shù)說明:

  • long deliveryTag: 當(dāng)一個消費(fèi)者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個方法攜帶了一個 deliveryTag,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識ID,是一個單調(diào)遞增的正整數(shù),deliveryTag 的范圍僅限于當(dāng)前 Channel。
  • boolean multiple: 是否批處理,一般為 false,當(dāng)該參數(shù)為 true 時,則可以一次性確認(rèn) deliveryTag 小于等于傳入值的所有消息。
  • boolean requeue: 是否重新放回隊(duì)列。
    • 如果參數(shù)為 true,則 RabbitMQ 會重新將這條消息存入隊(duì)列,以便發(fā)送給下一個訂閱的消費(fèi)者。
    • 如果參數(shù)為 false,則 RabbitMQ 會立即把消息從隊(duì)列中移除,不會把它發(fā)送給新的消費(fèi)者。
4)代碼實(shí)現(xiàn)
a.配置方式一:代碼配置

如果我們之前配置了 Jackson2JsonMessageConverter.java 的序列化方式,那么我們可以接著指定消費(fèi)方的消息確認(rèn)模式為 AcknowledgeMode.MANUL。

/*** 消息監(jiān)聽配置*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 設(shè)置連接工廠factory.setConnectionFactory(connectionFactory);// 設(shè)置消息確認(rèn)模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 設(shè)置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}
b.配置方式二:配置文件

我們可以直接在 application.yml 中進(jìn)行如下配置:

# 確認(rèn)模式,默認(rèn)auto,自動確認(rèn);manual:手動確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual

注意: yaml中指定的是消費(fèi)端容器的默認(rèn)配置,如果我們在代碼中有自定義注入 RabbitListenerContainerFactory 示例之后,還需要使用默認(rèn)配置,需要在代碼中進(jìn)行設(shè)置,如下所示:

@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer configurer;/*** 消息監(jiān)聽配置*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 設(shè)置連接工廠factory.setConnectionFactory(connectionFactory);// 采用yaml中的配置configurer.configure(factory, connectionFactory);// 設(shè)置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}
c.生產(chǎn)者

SendMessageController.java

import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** <p> @Title SendMessageController* <p> @Description 推送消息接口** @author ACGkaka* @date 2023/1/12 15:23*/
@Slf4j
@RestController
public class SendMessageController {/*** 使用 RabbitTemplate,這提供了接收/發(fā)送等方法。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機(jī):TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return "OK";}}
d.消費(fèi)者

DirectReceiver.java

import com.demo.config.RabbitDirectConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** <p> @Title DirectReceiver* <p> @Description 直連交換機(jī)監(jiān)聽類** @author ACGkaka* @date 2023/1/12 15:59*/
@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(Map<String, Object> testMessage, Message message, Channel channel) throws IOException {try {log.info("DirectReceiver消費(fèi)者收到消息: {}", testMessage.toString());// 手動答應(yīng)消費(fèi)完成,從隊(duì)列中刪除該消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {log.error("DirectReceiver消費(fèi)者消費(fèi)失敗,原因: {}", e.getMessage(), e);// 手動答應(yīng)消費(fèi)完成,從隊(duì)列中刪除該消息(不重回隊(duì)列)channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
e.測試結(jié)果

場景一:消費(fèi)者進(jìn)行手動確認(rèn),生產(chǎn)者推送2條消息:

可以看到,生產(chǎn)者推送2條消息后立馬被消費(fèi)了。

在這里插入圖片描述

場景二:消費(fèi)者不進(jìn)行手動確認(rèn),生產(chǎn)者推送2條消息:

雖然消費(fèi)者消費(fèi)完畢,但是由于沒有進(jìn)行手動確認(rèn),所以2條消息會一直處于 Unacked 狀態(tài),直到消費(fèi)者下線。

在這里插入圖片描述

關(guān)閉 SpringBoot 程序,消費(fèi)者下線后,消息由 Unacked 狀態(tài)轉(zhuǎn)為 Ready 狀態(tài),等待下一個消費(fèi)者上線后重新進(jìn)行消費(fèi)。

在這里插入圖片描述

整理完畢,完結(jié)撒花~ 🌻





參考地址:

1.RabbitMQ(4):消息確認(rèn)機(jī)制詳解,https://juejin.cn/post/7029232312197840904

2.RabbitMQ消息確認(rèn)機(jī)制(ACK),https://blog.csdn.net/pan_junbiao/article/details/112956537

3.RabbitMQ高級,https://blog.csdn.net/hnhroot/article/details/125921527

4.關(guān)于rabbitMQ在yml配置手動ack不生效,重復(fù)答應(yīng)的問題,https://blog.csdn.net/love_Saber_Archer/article/details/109111088

http://aloenet.com.cn/news/38240.html

相關(guān)文章:

  • 網(wǎng)站開發(fā)主要用什么語言武漢seo關(guān)鍵詞排名優(yōu)化
  • 服務(wù)器 無法訪問網(wǎng)站上海網(wǎng)絡(luò)推廣外包公司
  • 深圳建筑工程招聘信息凱里seo排名優(yōu)化
  • 最好的自助建站系統(tǒng)店鋪運(yùn)營方案策劃
  • 網(wǎng)站建設(shè)開發(fā)的條件軟文營銷實(shí)施背景
  • 在北京做網(wǎng)站制作一個月多少錢百度快照收錄入口
  • 東莞汽車網(wǎng)站建設(shè)百度指數(shù)排名
  • 網(wǎng)站建設(shè)實(shí)力滴滴友鏈
  • 深圳市招聘信息網(wǎng)站佛山網(wǎng)絡(luò)排名優(yōu)化
  • 網(wǎng)站建設(shè)包含哪些方面刷關(guān)鍵詞排名軟件有用嗎
  • 深度網(wǎng)營銷型網(wǎng)站建設(shè)公司怎么樣seo網(wǎng)站優(yōu)化
  • 東莞網(wǎng)站設(shè)計(jì)電話網(wǎng)址查詢服務(wù)中心
  • 重慶廣告網(wǎng)站推廣seo結(jié)算系統(tǒng)
  • 高端網(wǎng)站建設(shè)公司價格關(guān)鍵詞優(yōu)化價格
  • 免費(fèi)咨詢律師不收費(fèi)的平臺長治seo顧問
  • 阿里巴巴招聘官網(wǎng)西安關(guān)鍵詞優(yōu)化平臺
  • 做特殊單頁的網(wǎng)站怎樣建立一個網(wǎng)絡(luò)銷售平臺
  • 自己做靜態(tài)網(wǎng)站的步驟百度搜索技巧
  • 建站寶盒做的網(wǎng)站十大免費(fèi)excel網(wǎng)站
  • 公司做網(wǎng)站之前要準(zhǔn)備什么軟件廣點(diǎn)通廣告投放平臺登錄
  • 什么網(wǎng)站做電氣自動化兼職做優(yōu)化的網(wǎng)站
  • 做博物館網(wǎng)站最重要性seo視頻教學(xué)網(wǎng)站
  • 深圳做網(wǎng)站有哪些指數(shù)函數(shù)
  • 設(shè)計(jì)師做網(wǎng)站的流程西安網(wǎng)站建設(shè)哪家好
  • 怎么設(shè)計(jì)自己的網(wǎng)站知乎小說推廣對接平臺
  • 專業(yè)汽車網(wǎng)站seo日常工作都做什么的
  • 做網(wǎng)站百度推廣多少錢客戶關(guān)系管理系統(tǒng)
  • 設(shè)計(jì)做網(wǎng)站哪家公司好短期培訓(xùn)班學(xué)什么好
  • 遼寧城鄉(xiāng)建設(shè)部網(wǎng)站首頁網(wǎng)站策劃是干什么的
  • 內(nèi)網(wǎng)小網(wǎng)站的建設(shè)廣州網(wǎng)站運(yùn)營