站酷設(shè)計(jì)官方網(wǎng)站磁力下載
文章目錄
- 1.場(chǎng)景描述
-
- 1.1 場(chǎng)景1
- 1.2 場(chǎng)景2
- 2.原理
- 3.實(shí)戰(zhàn)開發(fā)
-
- 3.1 建表
- 3.2 集成mybatis-plus
- 3.3 集成RabbitMq
-
- 3.3.1 安裝mq
- 3.3.2 springBoot集成mq
- 3.4 具體實(shí)現(xiàn)
-
- 3.4.1 mq配置類
- 3.4.2 生產(chǎn)者
- 3.4.3 消費(fèi)者
1.場(chǎng)景描述
消息中間件是分布式系統(tǒng)常用的組件,無(wú)論是異步化、解耦、削峰等都有廣泛的應(yīng)用價(jià)值。我們通常會(huì)認(rèn)為,消息中間件是一個(gè)可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會(huì)丟失,即消息肯定會(huì)至少保證消息能被消費(fèi)者成功消費(fèi)一次,這是消息中間件最基本的特性之一,也就是我們常說(shuō)的“AT LEAST ONCE”,即消息至少會(huì)被“成功消費(fèi)一遍”。
1.1 場(chǎng)景1
什么意思呢?舉個(gè)例子:一個(gè)消息M發(fā)送到了消息中間件,消息投遞到了消費(fèi)程序A,A接受到了消息,然后進(jìn)行消費(fèi),但在消費(fèi)到一半的時(shí)候程序重啟了,這時(shí)候這個(gè)消息并沒有標(biāo)記為消費(fèi)成功,這個(gè)消息還會(huì)繼續(xù)投遞給這個(gè)消費(fèi)者,直到其消費(fèi)成功了,消息中間件才會(huì)停止投遞。
這種情景就會(huì)出現(xiàn)消息可能被多次地投遞。
1.2 場(chǎng)景2
還有一種場(chǎng)景是程序A接受到這個(gè)消息M并完成消費(fèi)邏輯之后,正想通知消息中間件“我已經(jīng)消費(fèi)成功了”的時(shí)候,程序就重啟了,那么對(duì)于消息中間件來(lái)說(shuō),這個(gè)消息并沒有成功消費(fèi)過(guò),所以他還會(huì)繼續(xù)投遞。這時(shí)候?qū)τ趹?yīng)用程序A來(lái)說(shuō),看起來(lái)就是這個(gè)消息明明消費(fèi)成功了,但是消息中間件還在重復(fù)投遞。
以上兩個(gè)場(chǎng)景對(duì)于消息隊(duì)列來(lái)說(shuō)就是同一個(gè)messageId的消息重復(fù)投遞下來(lái)了。
我們利用消息id來(lái)判斷消息是否已經(jīng)消費(fèi)過(guò),如果該信息被消費(fèi)過(guò),那么消息表中已經(jīng) 會(huì)有一條數(shù)據(jù),由于消費(fèi)時(shí)會(huì)先執(zhí)行插入操作,此時(shí)會(huì)因?yàn)橹麈I沖突無(wú)法重復(fù)插入,我們就利用這個(gè)原理來(lái)進(jìn)行冪等的控制,消息內(nèi)容可以用json格式來(lái)進(jìn)行傳輸?shù)摹?/p>
3.實(shí)戰(zhàn)開發(fā)
3.1 建表
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (`message_id` varchar(50) NOT NULL COMMENT '消息ID',`message_content` varchar(2000) DEFAULT NULL COMMENT '消息內(nèi)容',`status` int DEFAULT '0' COMMENT '消費(fèi)狀態(tài)(0-未消費(fèi)成功;1-消費(fèi)成功)',`retry_times` int DEFAULT '0' COMMENT '重試次數(shù)',`type` int DEFAULT '0' COMMENT '消費(fèi)類型',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2 集成mybatis-plus
《springBoot集成mybatisPlus》
3.3 集成RabbitMq
3.3.1 安裝mq
推薦使用docker安裝rabbitmq,還未安裝的可以參考以下信息:
- docker安裝
3.3.2 springBoot集成mq
- 1.添加依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.4 生產(chǎn)者具體實(shí)現(xiàn)
3.4.1 mq配置類
- DirectRabbitConfig
具體如何開啟可以參考《rabbitMq實(shí)現(xiàn)死信隊(duì)列》
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitmqConfig {//正常交換機(jī)的名字public final static String EXCHANGE\_NAME = "exchange\_name";//正常隊(duì)列的名字public final static String QUEUE\_NAME="queue\_name";//死信交換機(jī)的名字public final static String EXCHANGE\_DEAD = "exchange\_dead";//死信隊(duì)列的名字public final static String QUEUE\_DEAD="queue\_dead";//死信路由keypublic final static String DEAD\_KEY="dead.key";//創(chuàng)建正常交換機(jī)@Bean(EXCHANGE\_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)//持久化 mq重啟后數(shù)據(jù)還在.durable(true).build();}//創(chuàng)建正常隊(duì)列@Bean(QUEUE\_NAME)public Queue queue(){//正常隊(duì)列和死信進(jìn)行綁定 轉(zhuǎn)發(fā)到 死信隊(duì)列,配置參數(shù)Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}//正常隊(duì)列綁定正常交換機(jī) 設(shè)置規(guī)則 執(zhí)行綁定 定義路由規(guī)則 requestmaping映射@Beanpublic Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange)//路由規(guī)則.with("app.#").noargs();}//創(chuàng)建死信隊(duì)列@Bean(QUEUE\_DEAD)public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}//創(chuàng)建死信交換機(jī)@Bean(EXCHANGE\_DEAD)public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD).durable(true) //持久化 mq重啟后數(shù)據(jù)還在.build();}//綁定死信隊(duì)列和死信交換機(jī)@Beanpublic Binding deadBinding(){return BindingBuilder.bind(queueDead()).to(exchangeDead())//路由規(guī)則 正常路由key.with(DEAD\_KEY).noargs();}/\*\*獲取死信的配置信息\*\*\*/public Map<String,Object>getMap(){//3種方式 任選其一,選擇其他方式之前,先把交換機(jī)和隊(duì)列刪除了,在啟動(dòng)項(xiàng)目,否則報(bào)錯(cuò)。//方式一Map<String,Object> map=new HashMap<>(16);//死信交換器名稱,過(guò)期或被刪除(因隊(duì)列長(zhǎng)度超長(zhǎng)或因空間超出閾值)的消息可指定發(fā)送到該交換器中;map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);//死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來(lái)的路由鍵值map.put("x-dead-letter-routing-key", DEAD\_KEY);//方式二//消息的過(guò)期時(shí)間,單位:毫秒;達(dá)到時(shí)間 放入死信隊(duì)列// map.put("x-message-ttl",5000);//方式三//隊(duì)列最大長(zhǎng)度,超過(guò)該最大值,則將從隊(duì)列頭部開始刪除消息;放入死信隊(duì)列一條數(shù)據(jù)// map.put("x-max-length",3);return map;}}
- 延遲隊(duì)列配置
具體如何開啟可以參考《rabbitMq實(shí)現(xiàn)死信隊(duì)列》
由于rabbitMq中不直接支持死信隊(duì)列,需要我們利用插件rabbitmq_delayed_messgae_exchage進(jìn)行開啟
/*** 定義延遲交換機(jī)*/
@Configuration
public class RabbitMQDelayedConfig {//隊(duì)列private static final String DELAYQUEUE = "delayedqueue";//交換機(jī)private static final String DELAYEXCHANGE = "delayedExchange";@Beanpublic Queue delayqueue(){return new Queue(DELAYQUEUE);}//自定義延遲交換機(jī)@Beanpublic CustomExchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");/*** 1、交換機(jī)名稱* 2、交換機(jī)類型* 3、是否需要持久化* 4、是否需要自動(dòng)刪除* 5、其他參數(shù)*/return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);}//綁定隊(duì)列和延遲交換機(jī)@Beanpublic Binding delaybinding(){return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();}
}
3.4.2 生產(chǎn)者
- 1.消費(fèi)隊(duì)列的生產(chǎn)者
import com.example.shop.config.RabbitmqConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class Sender_Direct {@Autowiredprivate AmqpTemplate rabbitTemplate;/*** 用于消費(fèi)訂單** @param orderId*/public void send2Direct(String orderId) {//創(chuàng)建消費(fèi)對(duì)象,并指定全局唯一ID(這里使用UUID,也可以根據(jù)業(yè)務(wù)規(guī)則生成,只要保證全局唯一即可)MessageProperties messageProperties = new MessageProperties();rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY, "內(nèi)容設(shè)置", message -> {//設(shè)置消息的id為唯一messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");message.getMessageProperties().setMessageId(orderId);return message;});}}
3.4.3 消費(fèi)者
1.開啟手動(dòng)ack配置
spring:application:name: shoprabbitmq:host: 192.168.1.102port: 5673virtual-host: /username: guestpassword: guestlistener:simple:# 表示消費(fèi)者消費(fèi)成功消息以后需要手工的進(jìn)行簽收(ack確認(rèn)),默認(rèn)為 autoacknowledge-mode: manual
消費(fèi)者要配置ack重試機(jī)制,具體參考前幾篇文章,使用的是mysql消息ID的唯一性,有時(shí)候可能生成一樣的訂單,具體的沒有進(jìn)行實(shí)驗(yàn),內(nèi)容是json生成的,可以執(zhí)行業(yè)務(wù)
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.des.Bean.MessageIdempotent;
import com.example.des.Bean.Shop;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class Receiver_Direct {private static final Integer delayTimes = 30;//延時(shí)消費(fèi)時(shí)間,單位:秒@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = {"smsQueue"})public void receiveD(Message message, Channel channel) throws IOException {try {// 獲取消息IdString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody());//獲取消息//向數(shù)據(jù)庫(kù)插入數(shù)據(jù)MessageIdempotent messageIdempotent = new MessageIdempotent();messageIdempotent.setMessageId(messageId);messageIdempotent.setMessageContent(msg);messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save = true; //設(shè)置保存成功,消息投遞失敗是在確認(rèn)模式那里if (!save) {//說(shuō)明屬于重重復(fù)請(qǐng)求//1、處理消息內(nèi)容的業(yè)務(wù),解析json數(shù)據(jù)//2、創(chuàng)建訂單,并保存Boolean flag = consumeOrder(new Shop());if (flag){//投入延遲隊(duì)列,如果30分鐘訂單還沒有消費(fèi),就刪除訂單rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{//設(shè)置發(fā)送消息的延長(zhǎng)時(shí)間 單位:ms,表示30分鐘message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息狀態(tài),消費(fèi)成功,channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}else {//延遲投入死信,進(jìn)行重試channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}} else {//1、處理消息內(nèi)容的業(yè)務(wù),解析json數(shù)據(jù)//2、創(chuàng)建訂單,并保存//投入死信隊(duì)列channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch (Exception e){System.out.println("錯(cuò)誤信息");}}private boolean consumeOrder(Shop shop) {return true;}@RabbitListener(queues = {" delay.queue.demo.delay.queue"})public void dead(String payload, Message message, Channel channel) throws IOException {System.out.println("死信隊(duì)列:"+payload);//刪除消息 將數(shù)據(jù)庫(kù)狀態(tài)更新為失敗,更新郵件或者消息通知,有時(shí)候可以人工消費(fèi)long deliveryTag=message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag,true);}@RabbitListener(queues = "delayedqueue")public void receivemsg(Message messages){//查詢有沒有被消費(fèi),也就是更新成功,有時(shí)候需要樂(lè)觀鎖}
}
至此mq的消息重復(fù)以及冪等的信息處理就很完美的解決了,當(dāng)然本文以數(shù)據(jù)庫(kù)為例進(jìn)行實(shí)現(xiàn),感興趣的可以嘗試使用redis來(lái)進(jìn)行實(shí)現(xiàn)