wordpress主題插件seo工程師
消息何去何從
mandatory和immediate是channel.basicPublish方法的兩個(gè)參數(shù),都有消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能。
mandatory參數(shù)
- true:交換器無法根據(jù)自身的類型 和路由鍵找到符合條件的隊(duì)列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者
- 生產(chǎn)者調(diào)用channel.addReturnListener添加ReturnListener監(jiān)聽器實(shí)現(xiàn)
- false:消息直接丟棄
immediate參數(shù)
告訴服務(wù)器至少將該消息路由到一個(gè)隊(duì)列中,否則將消息返回給生產(chǎn)者。
Rabbitmq3.0去掉了對immediate參數(shù)支持,建議采用TTL和DLX方法替代
- true:如果交換器在將消息路由到隊(duì)列時(shí)發(fā)現(xiàn)隊(duì)列上并不存在任何消費(fèi)者,這條消息將不會存入隊(duì)列中,當(dāng)與路由鍵匹配的所有隊(duì)列都沒有消費(fèi)者時(shí),該消息會通過Basic.Return返回至生產(chǎn)者。
備份交換器(AE)
生產(chǎn)者發(fā)送消息不設(shè)置mandatory,消息未被路由會丟失,設(shè)置了,需要添加ReturnListener。如果不想編程復(fù)雜也不想消息丟失使用備份交換器。
使用:
- 聲明交換器時(shí)添加alternate-exchange參數(shù)實(shí)現(xiàn)
- channel.exchangeDeclare(“myAe”,“fanout”,true,false,null);
- channel.queueDeclare(“unroutedQueue”,true,false,false,null);
- channel.queueBind(“unroutedQueue”,“myAe”,“”);
- 通過策略(Policy)實(shí)現(xiàn)
- rabbitmqctl set_policy AE “^normalExchange$” ‘{“alternate-exchange”:“myAE”}’
特殊情況:
- 如果設(shè)置了備份交換器不存在,客戶端和RabbitMQ服務(wù)端都不會有異常出現(xiàn),此時(shí)消息會丟失
- 如果備份交換器沒有綁定任何隊(duì)列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時(shí)消息會丟失
- 如果備份交換器沒有任何匹配的隊(duì)列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時(shí)消息會丟失
- 如果備份交換器和mandatory參數(shù)一起使用,那么mandatory參數(shù)無效
過期時(shí)間(Time to Live ,TTL)
設(shè)置消息TTL
-
通過隊(duì)列屬性設(shè)置,隊(duì)列中所有消息都有相同的過期時(shí)間(一旦過期,就從隊(duì)列中抹去,消息已經(jīng)在隊(duì)列頭部,只要定期從隊(duì)列頭部開始掃描即可)
-
channel.queueDeclare方法中加入x-message-ttl參數(shù)實(shí)現(xiàn),單位ms
-
Map<String,Object> args = new HashMap<String,Object>(); args.put("x-message-ttl",6000); channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
-
-
通過Policy方式設(shè)置ttl
-
rabbitmqctl set_policy TTL ".*" '{"message-ttl":6000}' --apply-to queue
-
-
通過調(diào)用http api接口設(shè)置
-
-
通過對消息本身單獨(dú)設(shè)置,每條消息的ttl可以不同(即使過期,也不會馬上抹去,是否過期是在即將投遞到消費(fèi)者之前判定的)
- 代碼設(shè)置
- 設(shè)置AMQP.BasicProperties屬性
- set屬性:deliveryMode(持久化消息),expiration(ttl時(shí)間)
- 通過 http api接口設(shè)置
- 代碼設(shè)置
-
如果兩個(gè)方法一起使用,消息的ttl以兩者之間較小的數(shù)值為準(zhǔn),消息在隊(duì)列中一旦超過設(shè)置的ttl時(shí),就會變成死信,消費(fèi)者將無法再收到該消息
-
不設(shè)置ttl,表示此消息不會過期,ttl=0,表示除非此時(shí)可以直接將消息投遞到消費(fèi)者,否則立即丟棄。
設(shè)置隊(duì)列TTL
channel.queueDeclare方法中的x-expires參數(shù)可以控制隊(duì)列被自動(dòng)刪除前處于未使用狀態(tài)的時(shí)間(未使用:隊(duì)列上沒有任何消費(fèi)者,隊(duì)列也沒有被重新聲明,并在過期時(shí)間段內(nèi)也未調(diào)用過Basic.Get命令)
Map<String, Object> args = new HashMap<String,Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue",false,false,false,args);
死信隊(duì)列
當(dāng)消息在一個(gè)隊(duì)列中變成死信后,能被重新被發(fā)送到另一個(gè)交換器中,這個(gè)交換器就是DLX,死信交換器,綁定DLX的隊(duì)列就是死信隊(duì)列
消息變成死信情況
- 消息被拒絕
- 消息過期
- 隊(duì)列達(dá)到最大長度
當(dāng)隊(duì)列中存在死信時(shí),rabbitmq會自動(dòng)將這個(gè)消息重新發(fā)布到設(shè)置的DLX上去,進(jìn)而被路由到另一個(gè)隊(duì)列,死信隊(duì)列,可以監(jiān)聽這個(gè)隊(duì)列的消息進(jìn)行相應(yīng)處理
設(shè)置方法
- 代碼設(shè)置:
- channel.queueDeclare方法中設(shè)置x-dead-letter-exchange為隊(duì)列添加DLX
- 通過Policy方式設(shè)置
延遲隊(duì)列
延遲隊(duì)列存儲的對象是對應(yīng)的延遲消息(當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立刻拿到消息,而是等待特定時(shí)間后,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi))。
場景:
- 訂單系統(tǒng),30min內(nèi)未支付,進(jìn)行異常處理
- 手機(jī)遙控設(shè)備指定時(shí)間工作
通過DLX 和TTL模擬延遲隊(duì)列的功能。
假設(shè)一個(gè)應(yīng)用中需要將每條消息都設(shè)置為10秒延遲,生產(chǎn)者通過exchange.normal交換器將發(fā)送的消息存儲在queue.normal隊(duì)列,消費(fèi)者訂閱的是queue.dlx隊(duì)列,當(dāng)消息從queue.normal整個(gè)隊(duì)列中過期之后被存入queue.dlx隊(duì)列,消費(fèi)者恰巧消費(fèi)到了延遲10秒的這條消息。
優(yōu)先級隊(duì)列
實(shí)現(xiàn):通過設(shè)置隊(duì)列的x-max-priority參數(shù)實(shí)現(xiàn)
默認(rèn)最低優(yōu)先級為0,越高越優(yōu)先消費(fèi)
前提:如果在消費(fèi)速度大于生成者的速度且broker中沒有消息堆積的情況下,對發(fā)送的消息設(shè)置優(yōu)先級就沒什么意義了。
RPC實(shí)現(xiàn)
客戶端發(fā)送請求消息,服務(wù)端回復(fù)響應(yīng)的消息,為了接收響應(yīng)的消息,需要在請求消息中發(fā)送一個(gè)回調(diào)隊(duì)列
String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
- replyTo:用來設(shè)置一個(gè)回調(diào)隊(duì)列
- correlationId:用來關(guān)聯(lián)請求和其調(diào)用RPC之后的回復(fù),每一個(gè)請求設(shè)置一個(gè)唯一的correlationId
可以為每一個(gè)客戶端創(chuàng)建一個(gè)單一的回調(diào)隊(duì)列。
持久化
- 交換器持久化:通過在聲明隊(duì)列是將durable參數(shù)置為true實(shí)現(xiàn)的。如果不持久化,rabbitmq服務(wù)重啟后,相關(guān)的交換器元數(shù)據(jù)會丟失,消息不丟失,只是不能將消息發(fā)送到這個(gè)交換器中了。
- 隊(duì)列持久化:通過在聲明隊(duì)列時(shí)將durable置為true,如果不設(shè)置持久化,rabbitmq重啟后,相關(guān)隊(duì)列元數(shù)據(jù)會丟失,此時(shí)數(shù)據(jù)也會丟失。
- 消息持久化:通過將消息的投遞模式BasicProperties中的diliveryMode屬性設(shè)置為2即可實(shí)現(xiàn)消息的持久化
- 設(shè)置了隊(duì)列和消息的持久化,rabbitmq服務(wù)重啟后,消息依舊存在。
將交換器、隊(duì)列、消息都設(shè)置持久化后不能保證數(shù)據(jù)百分百丟失。
生產(chǎn)者確認(rèn)
確定消息到底有沒有正確到達(dá)服務(wù)器??梢酝ㄟ^事務(wù)機(jī)制和發(fā)送方確認(rèn)機(jī)制
事務(wù)機(jī)制
rabbitmq客戶端與事務(wù)機(jī)制相關(guān)方法
- channel.txSelect:用于當(dāng)前的信道設(shè)置成事務(wù)模式
- channel.txCommit:用于提交事務(wù)
- channel.txRollback:用于事務(wù)回滾
開啟事務(wù)流程
- 客戶端發(fā)送Tx.select,將信道置為事務(wù)模式
- Broker回復(fù)Tx.Select-Ok,確認(rèn)已將信道置為事務(wù)模式
- 在發(fā)送完消息后,客戶端發(fā)送Tx.Commit提交事務(wù)
- Broker回復(fù)Tx.Commit-Ok,確認(rèn)提交事務(wù)
- 如果發(fā)生異常,在捕獲異常后,channel.txRollback()回滾
缺點(diǎn):會有性能損失
發(fā)送方確認(rèn)機(jī)制
- 生產(chǎn)者將信道設(shè)置成confirm模式(channel.confirmSelect),rabbitmq同意:Confirm.Select-Ok;
- 一旦信道進(jìn)入confirm模式,所有在該信道上發(fā)布的消息都會被指派一個(gè)唯一id
- 一旦消息被投遞到匹配的隊(duì)列后,rabbitmq會發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一id),使得生產(chǎn)者知曉消息已經(jīng)正確到達(dá)了目的地。
事務(wù)機(jī)制在一條消息發(fā)送后會使發(fā)送端阻塞,等待rabbitmq回應(yīng)后才發(fā)下一條消息,而發(fā)送發(fā)確認(rèn)機(jī)制最大好處是異步的。生產(chǎn)者通過回調(diào)方法處理該確認(rèn)消息。如果rabbitmq因自身內(nèi)部錯(cuò)誤導(dǎo)致消息丟失,會發(fā)送一條nack命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理nack命令。
publisher confirm優(yōu)勢
- 批量confirm方法,每發(fā)送一批消息后,調(diào)用channel.waitForConfirms方法,等待服務(wù)器的確認(rèn)返回
- 異步confirm方法:提供一個(gè)回調(diào)方法,服務(wù)端確認(rèn)了一條或多條消息后客戶端會回調(diào)這個(gè)方法進(jìn)行處理。
消費(fèi)端要點(diǎn)介紹
消息分發(fā)
rabbitmq隊(duì)列擁有多個(gè)消費(fèi)者時(shí),隊(duì)列收到的消息將以輪詢的分發(fā)方式發(fā)送給消費(fèi)者,每條消息只會發(fā)送給訂閱列表里的一個(gè)消費(fèi)者。
- 問題:如果某些空閑,某些忙碌造成整體下降
- 方法:channel.basicQos方法允許限制信道上的消費(fèi)者所能保持的最大未確認(rèn)消息的數(shù)量。如果達(dá)到上限,就不會向這個(gè)消費(fèi)者再發(fā)送任何消息,知道消費(fèi)者確認(rèn)了某條消息后,相應(yīng)計(jì)數(shù)減1,之后消費(fèi)者可以繼續(xù)接受消息。
消息順序性
指消費(fèi)者消費(fèi)到的消息和發(fā)送者發(fā)布的消息的順序是一致的。
打破順序性的情形
- 如果生產(chǎn)者使用了事務(wù)機(jī)制,發(fā)送消息遇到異常進(jìn)行了事務(wù)回滾,需重新補(bǔ)償發(fā)送,如果是另一個(gè)線程實(shí)現(xiàn),則出現(xiàn)亂序。
- 如果生產(chǎn)者發(fā)送的消息設(shè)置了不同的超時(shí)時(shí)間,并設(shè)置了死信隊(duì)列,順序不一致。
- 設(shè)置了優(yōu)先級,也不是順序的。
要保證消息的順序性,需要業(yè)務(wù)方使用rabbitmq之后進(jìn)一步處理,例如在消息體內(nèi)添加全局有序標(biāo)識實(shí)現(xiàn)。
棄用QueueingConsumer
缺陷
- 內(nèi)存溢出問題:隊(duì)列堆積較多的消息,導(dǎo)致消費(fèi)者客戶端內(nèi)存溢出假死,不斷堆積
- 使用Basic.Qos限制某個(gè)消費(fèi)者所保持未確認(rèn)消息的數(shù)量。
- 會拖累同一個(gè)connection下的所有信道,性能降低
- 同步遞歸調(diào)用QueueingConsumer會產(chǎn)生死鎖
- rabbitmq的自動(dòng)連接恢復(fù)機(jī)制不支持Queueing Consumer這種形式
- QueueingConsumer不是事件驅(qū)動(dòng)的
消息傳輸保障
一般消息中間件消息傳輸保障分為三個(gè)層級
- 最多一次
- 最少一次
- 恰好一次
rabbitmq支持其中的最多一次和最少一次,其中最少一次投遞實(shí)現(xiàn)需要考慮
- 消息生產(chǎn)者需要開啟事務(wù)機(jī)制或publisher confirm機(jī)制,以確保消息可以可靠地傳輸?shù)絩abbitmq中
- 消息生產(chǎn)者需要配合使用mandatory參數(shù)或者備份交換器來確保消息能夠從交換器路由到隊(duì)列中,進(jìn)而能夠保存下來而不會被丟棄
- 消息和隊(duì)列都需要進(jìn)行持久化處理,以確保rabbitmq服務(wù)器在遇到異常情況時(shí)不會造成消息丟失
- 消費(fèi)者在消費(fèi)消息的同時(shí)需要將autoAck設(shè)置為false,然后通過手動(dòng)確認(rèn)的方式去確認(rèn)已經(jīng)正確消費(fèi)的消息,以避免在消費(fèi)端引起不必要的消息丟失。
參考:《RabbitMQ實(shí)戰(zhàn)指南》