国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

禹州做網(wǎng)站bz3399百度一下你就知道百度官網(wǎng)

禹州做網(wǎng)站bz3399,百度一下你就知道百度官網(wǎng),商業(yè)策劃公司,建網(wǎng)站多少錢合適一、springboot整合RabbitMQ(jdk17)(創(chuàng)建兩個項目,一個生產(chǎn)者項目,一個消費者項目) 上面使用原生JAVA操作RabbitMQ較為繁瑣,很多的代碼都是重復(fù)書寫的,使用springboot可以簡化代碼的…

一、springboot整合RabbitMQ(jdk17)(創(chuàng)建兩個項目,一個生產(chǎn)者項目,一個消費者項目)

  1. 上面使用原生JAVA操作RabbitMQ較為繁瑣,很多的代碼都是重復(fù)書寫的,使用springboot可以簡化代碼的編寫。

生產(chǎn)者項目

在這里插入圖片描述

第一步:創(chuàng)建springboot工程,然后引入rabbitmq的依賴

<!-- RabbitMQ起步依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:編寫配置文件

spring:rabbitmq:host: 192.168.70.130  # 虛擬機的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:編寫RabbitMQ的配置類

@Configuration
public class RabbitmqConfig1 {private final String EXCHANGE_NAME = "boot_exchange";private final String QUEUE_NAME = "boot_queue";private final String ROUTE_NAME = "boot_route";//創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange getExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue getQueue(){return new Queue(QUEUE_NAME);}//交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();}
}

第四步:編寫發(fā)送消息測試類

//編寫發(fā)送消息測試類
@SpringBootTest
public class RabbitmqTest {// 注入RabbitTemplate工具類@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage(){/*** 發(fā)送消息* 參數(shù)1:交換機* 參數(shù)2:路由key* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一個毛衫");System.out.println("發(fā)送消息成功");}
}

消費者項目

在這里插入圖片描述

第一步:創(chuàng)建springboot工程,然后引入rabbitmq的依賴

<!-- RabbitMQ起步依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:編寫配置文件

spring:rabbitmq:host: 192.168.70.130  # 虛擬機的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:編寫消費者,監(jiān)聽隊列

@Component
public class Consumer1 {/*** 監(jiān)聽隊列* @param message* queues表示監(jiān)聽的隊列的名稱*/@RabbitListener(queues = "boot_queue")public void listener(String message){System.out.println("接受到消息 = " + message);}
}

4、rabbitmq的消息可靠性

  1. RabbitMQ消息投遞的路徑為:
    生產(chǎn)者--->交換機--->隊列--->消費者

  2. 在RabbitMQ工作的過程中,每個環(huán)節(jié)消息都可能傳遞失敗,那么RabbitMQ是如何監(jiān)聽消息是否成功投遞的呢?

      1. 確認(rèn)模式(confirm):可以監(jiān)聽消息是否從生產(chǎn)者成功傳遞到交換機。
      1. 退回模式(return):可以監(jiān)聽消息是否從交換機成功傳遞到隊列。
      1. 消費者消息確認(rèn)(Consumer Ack):可以監(jiān)聽消費者是否成功處理消息。

【一】rabbitmq的消息可靠性——確認(rèn)模式

  1. 確認(rèn)模式(confirm):可以監(jiān)聽消息是否從生產(chǎn)者成功傳遞到交換機。
  2. 創(chuàng)建一個新的生產(chǎn)者項目,導(dǎo)入mq(上面的第一步操作)依賴進行開發(fā):(也可以在原來的基礎(chǔ)上修改信息)
    • 代碼組成和上面的生產(chǎn)者項目是一樣的,也是三步內(nèi)容。
第一步:修改配置文件

只是添加了一句代碼
在這里插入圖片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: / # 表示使用默認(rèn)的virtual-host#開啟確認(rèn)模式publisher-confirm-type: correlated#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:在生產(chǎn)者的配置類創(chuàng)建交換機和隊列(RabbitMQ的配置類)
@Configuration
public class RabbitmqConfig2Confirm {public final String EXCHANGE_NAME = "confirm_exchange";public final String QUEUE_NAME = "confirm_queue";public final String ROUTING_NAME = "confirm_routing";//    創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}
//    創(chuàng)建交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步:編寫測試類發(fā)生消息:生產(chǎn)者定義確認(rèn)模式的回調(diào)方法(springboot的測試類,能夠加載到第二步的配置類)
 @Testvoid testConfirm() {//回調(diào)確認(rèn)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 配置信息* @param b 是否成功,true 是 ,false 否* @param s 失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("發(fā)送成功");}else{System.out.println("發(fā)送失敗,原因:"+s);}}});//發(fā)送消息/*** 發(fā)送消息* 參數(shù)1:交換機* 參數(shù)2:路由key* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");}

由于rabbitmq的confirm確認(rèn)模式是確認(rèn)消息是否從生產(chǎn)者成功傳遞到交換機的,所以就沒必要寫消費者進行信息的消費了

  • 當(dāng)我們執(zhí)行測試類的時候,先執(zhí)行rabbitTemplate.convertAndSend(“confirm_exchange”,“confirm_routing”,“send message…confirm”);,無論消息是否成功發(fā)送都會調(diào)用 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()方法,如果發(fā)送成功則執(zhí)行if語句的代碼,如果發(fā)送失敗則調(diào)用else語句的代碼。
    • 根據(jù)執(zhí)行的是if或者else的語句,就能判斷消息是否成功傳遞到交換機了。

【二】rabbitmq的消息可靠性——退回模式

  1. 退回模式(return):可以監(jiān)聽消息是否從交換機成功傳遞到隊列。
  2. 創(chuàng)建一個新的生產(chǎn)者項目,導(dǎo)入mq(上面的第一步操作)依賴進行開發(fā):(也可以在原來的基礎(chǔ)上修改信息)
    • 代碼組成和上面的生產(chǎn)者項目是一樣的,也是三步內(nèi)容。
第一步:修改配置文件

只是添加了一句
在這里插入圖片描述

# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開啟確認(rèn)模式publisher-confirm-type: correlated#開始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:編寫配置類(RabbitMQ的配置類)
@Configuration
public class RabbitmqConfig3Return {public final String EXCHANGE_NAME = "return_exchange";public final String QUEUE_NAME = "return_queue";public final String ROUTING_NAME = "return_routing";
//    創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}//    創(chuàng)建交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步:編寫測試類發(fā)生消息:生產(chǎn)者定義退回模式的回調(diào)方法(springboot的測試類,能夠加載到第二步的配置類)
@Testvoid testReturnSendMessage(){
//        調(diào)用回退模式的回調(diào)方法,只有失敗才會回調(diào),成功不會回調(diào)哦
// 失敗后將失敗信息封裝到參數(shù)中rabbitTemplate.setReturnsCallback(returned ->{Message message = returned.getMessage();System.out.println("消息對象:"+message);System.out.println("錯誤碼:"+returned.getReplyCode());System.out.println("錯誤信息:"+returned.getReplyText());System.out.println("交換機:"+returned.getExchange());System.out.println("路由鍵:"+returned.getRoutingKey());});//        發(fā)送消息/*** 發(fā)送消息* 參數(shù)1:交換機* 參數(shù)2:路由key* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");}

由于rabbitmq的return回退模式是確認(rèn)消息是否從交換機成功傳遞到隊列的,還沒有傳遞到消費者,所以就沒必要寫消費者進行信息的消費了

  • 當(dāng)我們執(zhí)行測試類的時候,先執(zhí)行rabbitTemplate.convertAndSend(“return_exchange”,“return_routing”,“send message…return”);,如果消息成功發(fā)送到隊列上則不會調(diào)用 rabbitTemplate.setReturnsCallback方法,如果發(fā)送步成功則調(diào)用回調(diào)方法rabbitTemplate.setReturnsCallback,
    • 根據(jù)運行結(jié)果就可以知道在傳遞消息到隊列上的時候哪里發(fā)生錯誤了

在這里插入圖片描述

【三】rabbitmq的消息可靠性——消費者消息確認(rèn)(Consumer Ack)

  1. 在RabbitMQ中,消費者接收到消息后會向隊列發(fā)送確認(rèn)簽收的消息,只有確認(rèn)簽收的消息才會被移除隊列。這種機制稱為消費者消息確認(rèn)(Consumer Acknowledge,簡稱Ack)
    • 類似快遞員派送快遞也需要我們簽收,否則一直存在于快遞公司的系統(tǒng)中。
  2. 消費者消息確認(rèn)(Consumer Acknowledge,簡稱Ack)分為自動確認(rèn)手動確認(rèn)。
    • 自動確認(rèn)指消息只要被消費者接收到,無論是否成功處理消息,則自動簽收,并將消息從隊列中移除。
    • 但是在實際開發(fā)中,收到消息后可能業(yè)務(wù)處理出現(xiàn)異常,那么消息就會丟失。此時需要設(shè)置手動簽收,即在業(yè)務(wù)處理成功后再通知簽收消息,如果出現(xiàn)異常,則拒簽消息,讓消息依然保留在隊列當(dāng)中。

● 自動確認(rèn):spring.rabbitmq.listener.simple.acknowledge=“none”
● 手動確認(rèn):spring.rabbitmq.listener.simple.acknowledge=“manual”

  1. 創(chuàng)建一個新的生產(chǎn)者項目和新的消費者項目,導(dǎo)入mq(上面的第一步操作)依賴進行開發(fā):(也可以在原來的基礎(chǔ)上修改信息)
    • 代碼組成和上面的生產(chǎn)者項目是一樣的,也是三步內(nèi)容。
生產(chǎn)者項目:第一步:修改配置文件

不用修改

# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開啟確認(rèn)模式publisher-confirm-type: correlated#開始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生產(chǎn)者項目:第二步:編寫配置類(RabbitMQ的配置類)
@Configuration
public class RabbitmqConfig4ACK {public final String EXCHANGE_NAME = "ack_exchange";public final String QUEUE_NAME = "ack_queue";public final String ROUTING_NAME = "ack_routing";
//    創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}//    創(chuàng)建交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
生產(chǎn)者項目:第三步:編寫測試類發(fā)生消息:(springboot的測試類,能夠加載到第二步的配置類)
 @Testvoid testAck(){//        發(fā)送消息rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");}
消費者項目(自動確認(rèn)):第一步:修改配置文件
  • 消費者消息確認(rèn)——自動確認(rèn)的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: none   # 默認(rèn)就是自動確認(rèn)
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在這里插入圖片描述

  • 自動簽收模式就是:消息只要被消費者接收到,無論是否成功處理消息,則自動簽收,并將消息從隊列中移除。當(dāng)我們拿到消息的時候,業(yè)務(wù)出現(xiàn)異常了,所以無法正確處理消息,導(dǎo)致消息丟失了。
消費者項目(自動確認(rèn)):第二步:編寫消費者類,監(jiān)聽隊列
  • 自動確認(rèn)的消費者類
@Component
public class AckConsumer {
//    自動簽收@RabbitListener(queues = "ack_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        獲取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);
//        TODO,處理事務(wù)
//        故意出錯int i= 1/0;}}
消費者項目(手動確認(rèn)):第一步:修改配置文件
  • 消費者消息確認(rèn)——手動確認(rèn)的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: manual  
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消費者項目(手動確認(rèn)):第二步:編寫消費者類,監(jiān)聽隊列
  • 手動確認(rèn)
@Component
public class AckConsumer {//    手動簽收@RabbitListener(queues = "ack_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 消息投遞序號,消息每次投遞該值都會+1long deliveryTag = message.getMessageProperties().getDeliveryTag();try {
//            int i = 1/0; //模擬處理消息出現(xiàn)bugSystem.out.println("成功接受到消息:"+message);// 簽收消息/*** 參數(shù)1:消息投遞序號* 參數(shù)2:是否一次可以簽收多條消息*/channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消費失敗!");Thread.sleep(2000);// 拒簽消息/*** 參數(shù)1:消息投遞序號* 參數(shù)2:是否一次可以拒簽多條消息* 參數(shù)3:拒簽后消息是否重回隊列*/channel.basicNack(deliveryTag,true,true);}}
}

在這里插入圖片描述

在這里插入圖片描述

  • 手動簽收模式就是:如果出現(xiàn)異常,則拒簽消息,讓消息依然保留在隊列當(dāng)中。方便下次請求能夠請求到這次因為異常而沒有接收到的消息。

【四】RabbitMQ高級特性——消費端限流

在這里插入圖片描述

  • 前面說過MQ可以對請求進行“削峰填谷”,即通過消費端限流的方式限制消息的拉取速度,達到保護消費端的目的。
  • 使用【三】rabbitmq的消息可靠性——消費者消息確認(rèn)(Consumer Ack)的項目,消費者使用手動確認(rèn)模式的代碼即可(但是要修改配置文件)
第一步:先在生產(chǎn)者項目中,發(fā)送多個消息
@Testpublic void testLimitSendBatch() {// 發(fā)送十條消息for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "這是第"+i+"條消息");}}
第二步:修改消費者項目的配置文件

最主要就是配置文件的修改:
在這里插入圖片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: manual  #none是默認(rèn)的prefetch: 5  # 每次消費者從隊列拉取的消息數(shù)量(限制)#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第三步:重新編寫消費者類
@Component
public class ConsumerLimit {
//    手動簽收@RabbitListener(queues = "limit_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        獲取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);//        模擬業(yè)務(wù)處理Thread.sleep(3000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手動簽收channel.basicAck(deliveryTag,true);}}
  • 其實就是修改了消費者項目的配置文件,添加一條配置信息,限制消費者消息的拉取速度。
    在這里插入圖片描述

【五】RabbitMQ高級特性——利用限流實現(xiàn)不公平分發(fā)

  1. 在RabbitMQ中,多個消費者監(jiān)聽同一條隊列,則隊列默認(rèn)采用的輪詢分發(fā)。但是在某種場景下這種策略并不是很好,例如消費者1處理任務(wù)的速度非???#xff0c;而其他消費者處理速度卻很慢。此時如果采用公平分發(fā),則消費者1有很大一部分時間處于空閑狀態(tài)。此時可以采用不公平分發(fā),即誰處理的快,誰處理的消息多。
  • 在【四】RabbitMQ高級特性——消費端限流的基礎(chǔ)上,修改一消費者項目的配置文件,然后在消費者類中多寫幾個監(jiān)聽消息的方法(或者多寫幾個消費者類)。
第一步:修改消費者項目的配置文件

最主要就是配置文件的修改:
在這里插入圖片描述

spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: manual  #none是默認(rèn)的prefetch: 1  #  消費端最多拉取1條消息消費,這樣誰處理的快誰拉取下一條消息,實現(xiàn)了不公平分發(fā)#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:修改消費者類,編寫多個監(jiān)聽方法
@Component
public class ConsumerUnfair {
//  消費者1@RabbitListener(queues = "ack_queue")public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        獲取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消費者1"+s);Thread.sleep(3000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手動簽收channel.basicAck(deliveryTag,true);}//    消費者2@RabbitListener(queues = "ack_queue")public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        獲取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消費者2"+s);Thread.sleep(1000);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手動簽收channel.basicAck(deliveryTag,true);}// .......監(jiān)聽方法
}
  • 最主要的就是消費者項目的配置文件的修改: 配置消費端最多拉取1條消息消費,這樣誰處理的快誰拉取下一條消息,實現(xiàn)了不公平分發(fā)。

【六】RabbitMQ高級特性——消息存活時間

  1. RabbitMQ可以設(shè)置消息的存活時間(Time To Live,簡稱TTL),當(dāng)消息到達存活時間后還沒有被消費,會被移出隊列。RabbitMQ可以對隊列的所有消息設(shè)置存活時間,也可以對某條消息設(shè)置存活時間。
  • 使用【三】rabbitmq的消息可靠性——消費者消息確認(rèn)(Consumer Ack)的項目,消費者使用手動確認(rèn)模式的代碼
第一步:修改生產(chǎn)者項目的配置類

在這里插入圖片描述

@Configuration
public class RabbitmqConfig7ttl {public final String EXCHANGE_NAME = "ack_exchange";public final String QUEUE_NAME = "ack_queue";public final String ROUTING_NAME = "ack_routing";
//    創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
//                設(shè)置隊列的超時的時間,單位是毫秒.ttl(10000).build();}//    創(chuàng)建交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步:修改生產(chǎn)者項目的測試類

設(shè)置單條消息存活時間
在這里插入圖片描述

 @Testpublic void testTtlSendBatch() {// 發(fā)送十條消息for (int i = 0; i < 100; i++) {if (i%5 == 0) {//設(shè)置消息屬性MessageProperties messageProperties = new MessageProperties();//設(shè)置存活時間messageProperties.setExpiration("10000");// 創(chuàng)建消息對象(可以配置消息的一些配置)Message message = new Message(("這是第" + i + "條消息").getBytes(StandardCharsets.UTF_8), messageProperties);// 發(fā)送消息rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);}else {rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "這是第" + i + "條消息");}}}
    1. 如果設(shè)置了單條消息的存活時間,也設(shè)置了隊列的存活時間,以時間的為準(zhǔn)。
    1. 消息過期后,并不會馬上移除消息,只有消息消費到隊列頂端時,才會移除該消息

【七】RabbitMQ高級特性——優(yōu)先級隊列

  1. 假設(shè)在電商系統(tǒng)中有一個訂單催付的場景,即客戶在一段時間內(nèi)未付款會給用戶推送一條短信提醒,但是系統(tǒng)中分為大型商家和小型商家。比如像蘋果,小米這樣大商家一年能給我們創(chuàng)造很大的利潤,所以在訂單量大時,他們的訂單必須得到優(yōu)先處理,此時就需要為不同的消息設(shè)置不同的優(yōu)先級,此時我們要使用優(yōu)先級隊列。
  • 使用【三】rabbitmq的消息可靠性——消費者消息確認(rèn)(Consumer Ack)的項目,消費者使用手動確認(rèn)模式的代碼
第一步:修改生產(chǎn)者項目的配置類

在這里插入圖片描述

@Configuration
public class RabbitmqConfig8Priority {public final String EXCHANGE_NAME = "priority_exchange";public final String QUEUE_NAME = "priority_queue";public final String ROUTING_NAME = "priority_routing";
//    創(chuàng)建交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//    創(chuàng)建隊列@Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
//                設(shè)置隊列的優(yōu)先級,值越大優(yōu)先級越高,一般不超過10.maxPriority(10).build();}//    創(chuàng)建交換機和隊列綁定@Beanpublic Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步:修改生產(chǎn)者項目的測試
 @Testpublic void testPrioritySendBatch() {// 發(fā)送十條消息for (int i = 0; i < 100; i++) {if (i%5 == 0) {//設(shè)置消息屬性MessageProperties messageProperties = new MessageProperties();
//             設(shè)置優(yōu)先級messageProperties.setPriority(9);// 創(chuàng)建消息對象(可以配置消息的一些配置)Message message = new Message(("這是第" + i + "條消息").getBytes(StandardCharsets.UTF_8), messageProperties);// 發(fā)送消息rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);}else {rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "這是第" + i + "條消息");}}}
  • 設(shè)置了消息的優(yōu)先級,那么消費者項目在消費消息的時候就會優(yōu)先消費等級高的消息。

【八】RabbitMQ高級特性——死信隊列

  1. 在MQ中,當(dāng)消息成為死信(Dead message)后,消息中間件可以將其從當(dāng)前隊列發(fā)送到另一個隊列中,當(dāng)前隊列就是死信隊列。而在RabbitMQ中,由于有交換機的概念,實際是將死信發(fā)送給了死信交換機(Dead Letter Exchange,簡稱DLX)。死信交換機和死信隊列和普通的沒有區(qū)別。
    在這里插入圖片描述
  2. 消息成為死信的情況:
      1. 隊列消息長度到達限制。
      1. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false;
      1. 消息到達存活時間未被消費。
生產(chǎn)者項目:第一步:修改配置文件
# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開啟確認(rèn)模式publisher-confirm-type: correlated#開始回退模式publisher-returns: true#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生產(chǎn)者項目:第二步:編寫配置類(RabbitMQ的配置類)
@Configuration
public class RabbitmqConfig9Dead {//    死信private final String DEAD_EXCHANGE = "dead_exchange";private final String DEAD_QUEUE = "dead_queue";private final String DEAD_ROUTING = "dead_routing";// 死信交換機@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信隊列@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交換機綁定死信隊列@Beanpublic Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通private final String NORMAL_EXCHANGE = "normal_exchange";private final String NORMAL_QUEUE = "normal_queue";private final String NORMAL_ROUTING = "normal_routing";// 普通交換機@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();}// 普通隊列@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 綁定死信交換機.deadLetterRoutingKey(DEAD_ROUTING) // 死信隊列路由關(guān)鍵字.ttl(10000) // 消息存活10s.maxLength(10) // 隊列最大長度為10.build();}// 普通交換機綁定普通隊列@Beanpublic Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING).noargs();}
}
生產(chǎn)者項目:第三步:編寫測試類發(fā)生消息:(springboot的測試類,能夠加載到第二步的配置類)
@Test
public void testDlx(){// 存活時間過期后變成死信//     rabbitTemplate.convertAndSend("normal_exchange","normal_routing","測試死信");// 超過隊列長度后變成死信//     for (int i = 0; i < 20; i++) {//       rabbitTemplate.convertAndSend("normal_exchange","normal_routing","測試死信");//     }// 消息拒簽但不返回原隊列后變成死信rabbitTemplate.convertAndSend("normal_exchange","normal_routing","測試死信");
}
消費者項目(手動確認(rèn)):第一步:修改配置文件
  • 消費者消息確認(rèn)——手動確認(rèn)的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: manual  
#????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消費者項目(手動確認(rèn)):第二步:編寫消費者類,監(jiān)聽隊列
  • 手動確認(rèn)
@Component
public class ConsumerDead {@RabbitListener(queues = "normal_queue")public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        獲取消息String s = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("消費者1"+s);Thread.sleep(500);long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 拒絕簽收channel.basicNack(deliveryTag,true,false);}
  • 死信隊列小結(jié)
      1. 死信交換機和死信隊列和普通的沒有區(qū)別
      1. 當(dāng)消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
      1. 消息成為死信的三種情況:
        1. 隊列消息長度到達限制;
        1. 消費者拒接消費消息,并且不重回隊列;
        1. 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;

【九】RabbitMQ高級特性——延遲隊列

  1. 延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
    • 例如:
        1. 下單后,30分鐘未支付,取消訂單,回滾庫存。
        1. 新用戶注冊成功7天后,發(fā)送短信問候。
        • 實現(xiàn)方式:
            1. 定時器
            1. 延遲隊列
              在這里插入圖片描述
  • RabbitMQ中并未提供延遲隊列功能,我們可以使用死信隊列實現(xiàn)延遲隊列的效果。
    在這里插入圖片描述
    1. 延遲隊列 指消息進入隊列后,可以被延遲一定時間,再進行消費。
    1. RabbitMQ沒有提供延遲隊列功能,但是可以使用 : TTL + DLX 來實現(xiàn)延遲隊列效果。
      在這里插入圖片描述
第一步:創(chuàng)建springboot項目并添加依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
第二步:編寫配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動手動簽收listener:simple:acknowledge-mode: manual
# ????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第三步:編寫配置類
@Configuration
public class RabbitMQConfig {private final String DEAD_EXCHANGE = "order_expire_exchange";private final String DEAD_QUEUE = "order_expire_queue";private final String DEAD_ROUTING = "order_expire_routing";private final String ORDER_EXCHANGE = "order_exchange";private final String ORDER_QUEUE = "order_queue";private final String ORDER_ROUTING = "order_routing";// 死信交換機@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信隊列@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交換機綁定死信隊列@Beanpublic Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通交換機@Bean(ORDER_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}// 普通隊列@Bean(ORDER_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 綁定死信交換機.deadLetterRoutingKey(DEAD_ROUTING) // 死信隊列路由關(guān)鍵字.ttl(10000) // 消息存活10s(模擬30min超時).build();}// 普通交換機綁定普通隊列@Beanpublic Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();}
}
第四步:創(chuàng)建控制器,完成下單功能
@RestController
public class OrderController {//注入MQ@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/addOrder")public String addOrder(){//生成訂單號String orderNumber = "2030061812251234";//在service層完成訂單邏輯//將訂單號發(fā)送到訂單mq,30分鐘過期進入死信隊列,死信隊列消費查詢訂單支付狀態(tài),做對應(yīng)處理rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);return "下單成功! 您的訂單號為 :"+orderNumber;}
}
第五步:創(chuàng)建消費者,監(jiān)聽消息
@Component
public class ListenerOrder {//監(jiān)聽訂單過期隊列@RabbitListener(queues = "order_expire_queue")public void orderListener(String orderId){System.out.println("orderId = " + orderId);//根據(jù)訂單id查詢訂單狀態(tài)是否支付/*** 監(jiān)聽死信隊列的類,回去30min超時訂單號,根據(jù)訂單號查詢訂單的支付狀態(tài)* 支付:走下一步流程* 未支付:關(guān)閉訂單,庫存回滾*/}
}
手動簽收模式的結(jié)果
  • 在手動簽收模式的時候,當(dāng)我們啟動項目,訪問訂單功能時,立刻生成了一個隊列消息
    在這里插入圖片描述
  • 然后因為是手動簽收模式,所以在消息的存活時間過去了之后,成為了死信消息,所以被消息被拒收了,但是還存在隊列中。
    在這里插入圖片描述
自動簽收模式的結(jié)果
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#開動自動簽收listener:simple:acknowledge-mode: none   # 默認(rèn)的
# ????
logging:pattern:console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
  • 在自動簽收模式的時候,當(dāng)我們啟動項目,訪問訂單功能時,立刻生成了一個隊列消息
    在這里插入圖片描述
  • 因為是自動簽收的,所以消息過了存活時間之后就沒了(自動確認(rèn)指消息只要被消費者接收到,無論是否成功處理消息,則自動簽收,并將消息從隊列中移除)
    在這里插入圖片描述

RabbitMQ一、RabbitMQ的介紹與安裝(docker)

RabbitMQ二、RabbitMQ的六種模式

http://aloenet.com.cn/news/30918.html

相關(guān)文章:

  • 蘭州市建設(shè)廳網(wǎng)站2021年經(jīng)典營銷案例
  • eclipse做網(wǎng)站怎么優(yōu)化網(wǎng)站性能
  • 什么網(wǎng)站可以做兼職美工教育培訓(xùn)報名
  • 標(biāo)書制作員工資很低嗎seo關(guān)鍵詞排名優(yōu)化費用
  • 做網(wǎng)站營銷蘭州seo外包公司
  • 四川成都私人網(wǎng)站建設(shè)百度問答app下載
  • 企業(yè)網(wǎng)站建設(shè)個人博客2345網(wǎng)址導(dǎo)航設(shè)為主頁
  • 公司做網(wǎng)站流程快速seo關(guān)鍵詞優(yōu)化技巧
  • 幫人做網(wǎng)站一個多少錢環(huán)球網(wǎng)疫情最新動態(tài)
  • 廣東省住房和建設(shè)局官方網(wǎng)站百度搜索引擎工作原理
  • 小說主題+wordpressseo網(wǎng)站優(yōu)化知識
  • 個人手機網(wǎng)站開發(fā)站長工具日本
  • 網(wǎng)站雙語怎么做免費的編程自學(xué)網(wǎng)站
  • 鮮花網(wǎng)站建設(shè)的目標(biāo)百度賬號
  • 企業(yè)建站平臺哪個好深圳有實力的seo公司
  • 網(wǎng)站規(guī)劃與建設(shè)步驟愛站網(wǎng)收錄
  • 網(wǎng)站建設(shè) 柳州青島網(wǎng)站建設(shè)微動力
  • 個人網(wǎng)站設(shè)計與制作設(shè)計思路合肥網(wǎng)絡(luò)推廣有限公司
  • wordpress 網(wǎng)銀支付seo專業(yè)培訓(xùn)課程
  • 免費做自我介紹網(wǎng)站網(wǎng)站流量分析
  • 青島定制網(wǎng)站建設(shè)關(guān)鍵詞優(yōu)化排名公司
  • 昆明制作企業(yè)網(wǎng)站的公司競價托管的注意事項
  • 惠州做網(wǎng)站公司哪家好競價推廣價格
  • 小程序 微網(wǎng)站南寧網(wǎng)站關(guān)鍵詞推廣
  • 做網(wǎng)站的圖片Pc端和手機端的區(qū)別青島愛城市網(wǎng)app官方網(wǎng)站
  • 官方網(wǎng)站如何做外貿(mào)seo推廣招聘
  • 網(wǎng)上訂酒店 網(wǎng)站開發(fā)百度知道客服電話
  • 軟件開發(fā)工具有哪些基本功能搜索引擎優(yōu)化師工資
  • 怎樣用php做網(wǎng)站北京seo地址
  • 網(wǎng)站空間租用多少錢南寧網(wǎng)