国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

品牌廣告設(shè)計(jì)制作公司網(wǎng)站源碼班級(jí)優(yōu)化大師的功能有哪些

品牌廣告設(shè)計(jì)制作公司網(wǎng)站源碼,班級(jí)優(yōu)化大師的功能有哪些,谷歌網(wǎng)站怎么設(shè)置才能打開(kāi)網(wǎng)站,wordpress安全證書(shū)在分布式系統(tǒng)中,消息隊(duì)列通常用于解耦服務(wù),RabbitMQ是一個(gè)廣泛使用的消息隊(duì)列服務(wù)。延遲消息(也稱(chēng)為延時(shí)隊(duì)列或TTL消息)是一種常見(jiàn)的場(chǎng)景應(yīng)用,特別適合處理某些任務(wù)在一段時(shí)間后執(zhí)行的需求,如訂單超時(shí)處理、…

在分布式系統(tǒng)中,消息隊(duì)列通常用于解耦服務(wù),RabbitMQ是一個(gè)廣泛使用的消息隊(duì)列服務(wù)。延遲消息(也稱(chēng)為延時(shí)隊(duì)列或TTL消息)是一種常見(jiàn)的場(chǎng)景應(yīng)用,特別適合處理某些任務(wù)在一段時(shí)間后執(zhí)行的需求,如訂單超時(shí)處理、延時(shí)通知等。

本文將以具體代碼為例,展示如何使用RabbitMQ來(lái)實(shí)現(xiàn)延遲消息處理,涵蓋隊(duì)列和交換機(jī)的配置、消息的發(fā)送與接收以及死信隊(duì)列的處理。

什么是延遲消息?

延遲消息是指消息在發(fā)送到隊(duì)列后,經(jīng)過(guò)設(shè)定的時(shí)間延遲再被消費(fèi)。RabbitMQ 本身沒(méi)有直接支持延遲隊(duì)列的功能,但可以通過(guò) TTL(Time To Live)+ 死信隊(duì)列(Dead Letter Queue, DLQ) 的組合來(lái)實(shí)現(xiàn)。當(dāng)消息超過(guò)TTL(消息存活時(shí)間)后,不會(huì)被立即消費(fèi),而是會(huì)被轉(zhuǎn)發(fā)到綁定的死信隊(duì)列,從而實(shí)現(xiàn)延遲處理。

RabbitMQ中的延遲消息原理

在RabbitMQ中,我們可以通過(guò)以下幾個(gè)概念來(lái)實(shí)現(xiàn)延遲消息:

  1. TTL(Time To Live):可以為隊(duì)列設(shè)置TTL,消息超過(guò)該時(shí)間后會(huì)被標(biāo)記為“死信”。
  2. 死信隊(duì)列(Dead Letter Queue):當(dāng)消息在正常隊(duì)列中過(guò)期或處理失敗時(shí),RabbitMQ可以將它們路由到一個(gè)死信隊(duì)列,死信隊(duì)列可以用來(lái)處理這些過(guò)期或未處理的消息。
  3. x-dead-letter-exchangex-dead-letter-routing-key:可以通過(guò)配置隊(duì)列的參數(shù),將過(guò)期消息發(fā)送到一個(gè)專(zhuān)門(mén)的死信交換器,并根據(jù)指定的路由鍵轉(zhuǎn)發(fā)到死信隊(duì)列。

?

?消息來(lái)到ttl.queue消息隊(duì)列,過(guò)期時(shí)間內(nèi)無(wú)人消費(fèi),消息來(lái)到死信交換機(jī)hmall.direct,在direct.queue消息隊(duì)列無(wú)需等待。

1. RabbitMQ的配置

首先,我們需要配置兩個(gè)隊(duì)列和兩個(gè)交換機(jī):一個(gè)用于存放延時(shí)消息,另一個(gè)用于處理超時(shí)的死信消息。

package com.heima.stroke.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 延遲時(shí)間 單位:毫秒 (這里設(shè)為30秒)private static final long DELAY_TIME = 1000 * 30;// 行程超時(shí)隊(duì)列public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";// 行程死信隊(duì)列public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";// 行程超時(shí)隊(duì)列交換機(jī)public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";// 行程死信隊(duì)列交換機(jī)public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";// 行程超時(shí)交換機(jī) Routing Keypublic static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";// 行程死信交換機(jī) Routing Keypublic static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";/*** 聲明行程超時(shí)隊(duì)列,并設(shè)置其參數(shù)* x-dead-letter-exchange:綁定的死信交換機(jī)* x-dead-letter-routing-key:死信路由Key* x-message-ttl:消息的過(guò)期時(shí)間*/@Beanpublic Queue strokeOverQueue() {Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);args.put("x-message-ttl", DELAY_TIME); // 設(shè)置TTL為30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}@Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}@Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);}
}

解釋:

TTL設(shè)置:我們通過(guò)x-message-ttl設(shè)置消息的過(guò)期時(shí)間為30秒。

死信隊(duì)列綁定:通過(guò)x-dead-letter-exchangex-dead-letter-routing-key設(shè)置,當(dāng)消息過(guò)期時(shí),它會(huì)被轉(zhuǎn)發(fā)到死信交換機(jī),再路由到死信隊(duì)列。

2. 生產(chǎn)者發(fā)送延遲消息

接下來(lái),我們通過(guò)生產(chǎn)者向超時(shí)隊(duì)列發(fā)送消息,這些消息將在TTL過(guò)期后轉(zhuǎn)發(fā)到死信隊(duì)列。

package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MQProducer {private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);@AutowiredRabbitTemplate rabbitTemplate;/*** 發(fā)送延時(shí)消息到行程超時(shí)隊(duì)列** @param strokeVO 消息體*/public void sendOver(StrokeVO strokeVO) {String mqMessage = JSON.toJSONString(strokeVO);logger.info("send timeout msg:{}", mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);}
}

解釋:

sendOver 方法將消息發(fā)送到超時(shí)隊(duì)列,消息將在超時(shí)后進(jìn)入死信隊(duì)列。生產(chǎn)者不需要額外處理TTL或死信的配置,只需發(fā)送消息即可。

3. 消費(fèi)者監(jiān)聽(tīng)死信隊(duì)列

當(dāng)消息超過(guò)TTL后,將會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列。消費(fèi)者需要監(jiān)聽(tīng)死信隊(duì)列并處理這些消息。

j

package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
public class MQConsumer {private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);@Autowiredprivate StrokeHandler strokeHandler;/*** 監(jiān)聽(tīng)死信隊(duì)列** @param message 消息體* @param channel RabbitMQ的Channel* @param tag 消息的Delivery Tag*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key = RabbitConfig.STROKE_DEAD_KEY)})@RabbitHandlerpublic void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);logger.info("get dead msg:{}", message.getBody());if (strokeVO == null) {return;}try {// 處理超時(shí)的行程消息strokeHandler.timeoutHandel(strokeVO);// 手動(dòng)確認(rèn)消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}
}

解釋:

@RabbitListener 注解綁定了死信隊(duì)列的監(jiān)聽(tīng)器。當(dāng)消息被轉(zhuǎn)發(fā)到死信隊(duì)列時(shí),該消費(fèi)者會(huì)接收到消息。

使用 channel.basicAck(tag, false) 手動(dòng)確認(rèn)消息處理成功,確保消息不會(huì)重復(fù)消費(fèi)。

4. 處理超時(shí)業(yè)務(wù)邏輯

在我們的業(yè)務(wù)中,當(dāng)消息超時(shí)未處理時(shí),將其狀態(tài)設(shè)置為超時(shí)。

public void timeoutHandel(StrokeVO strokeVO) {// 獲取司機(jī)行程ID和乘客行程IDString inviterTripId = strokeVO.getInviterTripId();String inviteeTripId = strokeVO.getInviteeTripId();// 檢查邀請(qǐng)狀態(tài)是否為未確認(rèn)String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新為超時(shí)狀態(tài)redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));}
}

http://aloenet.com.cn/news/34166.html

相關(guān)文章:

  • wordpress使用兩個(gè)主題如何推廣seo
  • 獨(dú)立站都有哪些百度快速排名提升
  • 網(wǎng)站投票活動(dòng)怎么做百度域名注冊(cè)查詢(xún)
  • 沒(méi)有網(wǎng)站怎么做seob站推廣平臺(tái)
  • 網(wǎng)站報(bào)名怎么做市場(chǎng)營(yíng)銷(xiāo)培訓(xùn)
  • 網(wǎng)站目錄鏈接怎么做天津百度推廣電話(huà)
  • 粉色做網(wǎng)站背景圖片競(jìng)價(jià)推廣是什么意思
  • 臨汾哪做網(wǎng)站seo關(guān)鍵詞優(yōu)化推廣哪家好
  • 京東淘寶網(wǎng)站是怎么做的360免費(fèi)做網(wǎng)站
  • 微信鏈接網(wǎng)頁(yè)網(wǎng)站制作網(wǎng)站seo優(yōu)化推廣
  • wordpress quizzin網(wǎng)站怎樣優(yōu)化關(guān)鍵詞好
  • 大興智能網(wǎng)站建設(shè)哪家好企業(yè)營(yíng)銷(xiāo)策劃
  • 吉林省住房城鄉(xiāng)建設(shè)廳網(wǎng)站首頁(yè)什么是搜索推廣
  • 設(shè)計(jì)網(wǎng)站價(jià)格鄭州seo軟件
  • 湖南網(wǎng)絡(luò)公司網(wǎng)站建設(shè)seo教學(xué)平臺(tái)
  • 網(wǎng)站如何運(yùn)營(yíng)賺錢(qián)線(xiàn)下推廣方式都有哪些
  • 慈溪做網(wǎng)站網(wǎng)站打開(kāi)速度優(yōu)化
  • 公安網(wǎng)站建設(shè)公司網(wǎng)站與推廣
  • 莆田做網(wǎng)站的公司住房和城鄉(xiāng)建設(shè)部官網(wǎng)
  • 個(gè)體做外貿(mào)的網(wǎng)站2021百度模擬點(diǎn)擊工具
  • 企業(yè)微信網(wǎng)站建設(shè)東莞做網(wǎng)站哪里好
  • 南通公司網(wǎng)站建設(shè)怎么做網(wǎng)站推廣和宣傳
  • 空調(diào)維修技術(shù)支持東莞網(wǎng)站建設(shè)國(guó)家最新新聞
  • wordpress簡(jiǎn)約企業(yè)主題下載廣州seo技術(shù)外包公司
  • 網(wǎng)絡(luò)推廣合同網(wǎng)站seo優(yōu)化服務(wù)商
  • 北京設(shè)計(jì)院排名前十強(qiáng)湖南網(wǎng)站seo地址
  • 佛山百度網(wǎng)站排名深圳建站公司
  • 惠州網(wǎng)站建設(shè)找惠州邦百度云盤(pán)網(wǎng)頁(yè)登錄入口
  • 查看網(wǎng)站外鏈代碼百度高級(jí)搜索指令
  • 3東莞網(wǎng)站建設(shè)外貿(mào)網(wǎng)站推廣平臺(tái)