邢臺(tái)哪里提供網(wǎng)站制作網(wǎng)頁(yè)制作的軟件有哪些
目錄
1.TTL
1.1.設(shè)置消息過期時(shí)間
1.2.設(shè)置隊(duì)列過期時(shí)間
2.死信隊(duì)列
2.1.介紹
2.2.演示
3.延遲隊(duì)列
3.1.模擬實(shí)現(xiàn)延遲隊(duì)列
3.2.延遲隊(duì)列插件
4.事務(wù)與消息分發(fā)
4.1.事務(wù)?
4.2.消息分發(fā)
1.TTL
所謂的ttl,就是過期時(shí)間。對(duì)于rabbitmq,可以設(shè)置隊(duì)列和消息的過期時(shí)間
1.1.設(shè)置消息過期時(shí)間
(1)相關(guān)的交換機(jī)和隊(duì)列
//ttl
@Bean("ttlQueue")
public Queue ttlQueue() {return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}@Bean("ttlDirectExchange")
public DirectExchange ttlDirectExchange() {return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}
(2)生產(chǎn)者 -- 生產(chǎn)消息的時(shí)候設(shè)置ttl
給消息設(shè)置過期時(shí)間,也就是設(shè)置消息對(duì)象的屬性
第一種寫法:
//ttl@RequestMapping("/ttl")public String ttl() {MessagePostProcessor postProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");//設(shè)置10秒后過期return message;}};rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",postProcessor);return "ttl";}
第二種寫法:
//ttl
@RequestMapping("/ttl")
public String ttl() {rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",message -> {message.getMessageProperties().setExpiration("10000");//設(shè)置10秒后過期return message;});return "ttl";
}
(3)效果展示
消息在10s后就會(huì)自動(dòng)刪除 (本人承諾沒有做任何的處理)
(4)設(shè)置消息ttl注意事項(xiàng)
?設(shè)置消息TTL,即時(shí)消息過期,也不會(huì)馬上從隊(duì)列中刪除,而是在即將投遞到消費(fèi)者之前才會(huì)進(jìn)行判定。如果每次都要掃描整個(gè)隊(duì)列就會(huì)很低效。
給A消息設(shè)置30s過期,B消息設(shè)置10s過期,先將消息A存入隊(duì)列再存B消息,此時(shí)B消息30s后才會(huì)被刪除。
以上的兩條消息是同時(shí)消失。
1.2.設(shè)置隊(duì)列過期時(shí)間
給隊(duì)列設(shè)置TTL,指的是隊(duì)列中的消息在TTL后就會(huì)被刪除,而非隊(duì)列被刪除。
(1)設(shè)置交換機(jī)和隊(duì)列
代碼給隊(duì)列設(shè)置ttl,在創(chuàng)建隊(duì)列中調(diào)用 ttl() 方法即可
//隊(duì)列ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();//隊(duì)列中的消息20s后被刪除
}@Bean("ttlDirectExchange2")
public DirectExchange ttlDirectExchange2() {return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {return BindingBuilder.bind(queue).to(exchange).with("ttl2").noargs();
}
(2)生產(chǎn)者與效果
@RequestMapping("/ttl2")
public String ttl2() {rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","a ttl2");return "ttl2";
}
給隊(duì)列設(shè)置ttl,在web界面中就會(huì)有改效果出現(xiàn)。
如果把設(shè)置有ttl的消息發(fā)送到設(shè)置有ttl的隊(duì)列中,那么過期時(shí)間取值小的一個(gè)。
2.死信隊(duì)列
- 所謂死信,就是因?yàn)榉N種原因無法被消費(fèi)的消息。
- 所以死信隊(duì)列,就是用來存放死信的隊(duì)列。
- 死信到達(dá)死信隊(duì)列的交換機(jī)稱為DLX(Dead Letter Exchange)
2.1.介紹
(1)死信隊(duì)列的圖解
正常隊(duì)列中的消息因?yàn)橐恍┰蚓蜁?huì)變成死信,然后經(jīng)過一個(gè)特定的路由交換,最后到達(dá)一個(gè)指定的死信隊(duì)列中,然后再投遞給消費(fèi)者。
(2)消息稱為死信的原因
- 消息被拒絕,且設(shè)置了無法入隊(duì)
- 消息過期
- 隊(duì)列達(dá)到最大長(zhǎng)度
2.2.演示
要演示死信隊(duì)列的情況,就需要有兩種隊(duì)列和兩種交換機(jī)。
(1)聲明交換機(jī)和隊(duì)列
@Configuration
public class DlConfig {//正常的交換機(jī)和隊(duì)列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//綁定死信交換機(jī).deadLetterRoutingKey("dlx")//死信交換機(jī)的路由規(guī)則.ttl(10000)//10秒后消息過期.maxLength(10L)//隊(duì)列最大長(zhǎng)度為10.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");}//死信交換機(jī)和隊(duì)列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DL_QUEUE).build();}@Bean("dlxExchange")public DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();}@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");}}
上面我們知道了消息稱為死信的條件,其中消息過期和超過隊(duì)列最大長(zhǎng)度可以在聲明隊(duì)列時(shí)實(shí)現(xiàn)。所以,有如下的代碼改進(jìn)
//正常的交換機(jī)和隊(duì)列
@Bean("normalQueue")
public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//綁定死信交換機(jī).deadLetterRoutingKey("dlx")//死信交換機(jī)的路由規(guī)則.ttl(10000)//10秒后消息過期.maxLength(10L)//隊(duì)列最大長(zhǎng)度為10.build();
}
對(duì)于消息被拒,我們?cè)谙M(fèi)者部分進(jìn)行修改就好。
(2)生產(chǎn)者和消費(fèi)者
生產(chǎn)者:可以選擇模擬超出隊(duì)列最大長(zhǎng)度的情況
//dlx死信隊(duì)列
@RequestMapping("/dlx")
public String dlx() {int maxLen = 12;for(int i=0;i<maxLen;i++) {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal and dlx");}return "normal";
}
消費(fèi)者:
@Component
public class DlListener {//監(jiān)聽正常隊(duì)列,模擬拒絕消息//@RabbitListener(queues = Constant.NORMAL_QUEUE)public void normalListener() {System.out.println();}//監(jiān)聽死信隊(duì)列@RabbitListener(queues = Constant.DL_QUEUE)public void dlxListener(Message message) {System.out.println("死信隊(duì)列:"+ new String(message.getBody()));}}
隊(duì)列就變成了這樣子:?
(3)過期消息變成死信
(4)其他情況
比如拒絕消息和長(zhǎng)度超過最大隊(duì)列長(zhǎng)度
3.延遲隊(duì)列
延遲隊(duì)列,指消息發(fā)送到隊(duì)列后,消費(fèi)者并不能馬上拿到消息,而是等待指定的時(shí)間后才能消息該消息。
應(yīng)用場(chǎng)景:
(1)智能家居:比如通過手機(jī)下達(dá)命令控制家里的家居,達(dá)到一定時(shí)間段就自動(dòng)開啟。
(2)日常管理:預(yù)定一個(gè)會(huì)議,在會(huì)議開始前15分鐘就會(huì)通知參加人員
比如,我們經(jīng)常使用的手機(jī)鬧鐘,就是類似于延遲隊(duì)列的效果。
3.1.模擬實(shí)現(xiàn)延遲隊(duì)列
對(duì)于原生的rabbitmq,并沒有實(shí)現(xiàn)延遲隊(duì)列的功能,但是我們可以通過TTL+死信隊(duì)列來模擬實(shí)現(xiàn)。
(1)如何模擬實(shí)現(xiàn)
消費(fèi)者需要訂閱死信隊(duì)列,生產(chǎn)者把延遲的消息放入正常隊(duì)列中,當(dāng)消息過期就會(huì)自動(dòng)進(jìn)入死信隊(duì)列,消費(fèi)者進(jìn)而可以拿到消息。
對(duì)于TTL,我們是設(shè)置消息的TTL,也可以設(shè)置隊(duì)列的過期時(shí)間。
(2)模擬實(shí)現(xiàn)
死信隊(duì)列:
@Configuration
public class DlConfig {//正常的交換機(jī)和隊(duì)列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//綁定死信交換機(jī).deadLetterRoutingKey("dlx")//死信交換機(jī)的路由規(guī)則.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");}//死信交換機(jī)和隊(duì)列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DL_QUEUE).build();}@Bean("dlxExchange")public DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();}@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");}}
生產(chǎn)者:生產(chǎn)者在生產(chǎn)消息的時(shí)候加上過期時(shí)間,也就是TTL
@RequestMapping("/delay")
public String delay() {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","這是一條延遲消息",message -> {message.getMessageProperties().setExpiration("10000");//設(shè)置10秒后過期return message;});System.out.println("消息發(fā)送時(shí)間:"+new Date());return "delay";
}
消費(fèi)者:消費(fèi)者訂閱死信隊(duì)列
//監(jiān)聽死信隊(duì)列@RabbitListener(queues = Constant.DL_QUEUE)public void dlxListener(Message message) {System.out.println("消費(fèi)時(shí)間: "+new Date() +",死信隊(duì)列:"+ new String(message.getBody()));}
演示結(jié)果:
結(jié)果恰好是10秒后。
(3)存在的缺陷
前面我們知道,當(dāng)多個(gè)攜帶TTL的消息進(jìn)入隊(duì)列中,并且前面消息的TTL大于后面的;那么就會(huì)出現(xiàn),只有前面的消息過期,后面的消息才會(huì)跟著過期,這就是TTL+私信隊(duì)列存在的問題。
所以我們使用一個(gè)插件,使用插件帶來的延遲隊(duì)列進(jìn)行操作。
3.2.延遲隊(duì)列插件
(1)下載插件并啟用
下載地址:這個(gè)頁(yè)面如果點(diǎn)不開,可以使用加速軟件加速
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
找到.ez文件:點(diǎn)擊就會(huì)下載插件文件
確定安裝目錄:
像ubunto的就使用這兩個(gè)目錄的其中一個(gè)即可:
將文件復(fù)制到目錄下:沒有目錄就創(chuàng)建
找到目錄
安裝插件前:
安裝插件:直接將文件拖拽進(jìn)來即可
安裝插件后:多出來的插件
啟動(dòng)插件:
最后在rabbitmq客戶端查看交換機(jī)類型
這就說明延遲插件啟動(dòng)成功,后續(xù)使用該交換機(jī)即可。
(2)定義交換機(jī)和隊(duì)列
@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constant.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();}}
生產(chǎn)者:
@RequestMapping("/delay2")
public String delay2() {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","這是一條延遲消息",message -> {message.getMessageProperties().setExpiration("10000");//設(shè)置10秒后過期return message;});System.out.println("消息發(fā)送時(shí)間:"+new Date());return "delay";
}
消費(fèi)者:
@Component
public class DelayListener {@RabbitListener(queues = Constant.DELAY_QUEUE)public void delayListener() {System.out.println("消息時(shí)間:"+new Date());}}
(3)演示
使用延遲插件就不會(huì)出現(xiàn)像上述TTL+死信隊(duì)列的問題
如果需要關(guān)閉插件,執(zhí)行下面的命令即可:
rabbitmq delayed message exchange
4.事務(wù)與消息分發(fā)
4.1.事務(wù)?
?事務(wù),就是保證發(fā)送消息和接收消息是原子性的,要么全部成功,要么全部失敗
(1)配置事務(wù)管理器
這里就是需要對(duì)AMQP客戶端進(jìn)行設(shè)置屬性
//3.返回事務(wù)
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);//true為開啟事務(wù)return rabbitTemplate;
}@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);//事務(wù)管理器}
后續(xù)使用該對(duì)象就可以完成事務(wù)的操作
(2)準(zhǔn)備隊(duì)列和交換機(jī)
這里使用系統(tǒng)默認(rèn)的交換機(jī)即可
//事務(wù)@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(Constant.TRANS_QUEUE).build();}
(3)消費(fèi)者
@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一條消息");System.out.println("異常前");int a=9/0;//模擬發(fā)送異常rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二條消息");return "trans";}
上面的消費(fèi)者是沒有使用事物的
(4)沒有采取事務(wù)
這里指的是既沒有開啟事務(wù),也沒有在方法上加上@Transactional注解
運(yùn)行結(jié)果:
異常前成功發(fā)送消息,異常后的消息沒有進(jìn)行發(fā)送成功。
(5)使用事務(wù)
@Transactional@RequestMapping("/trans")public String trans() {transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一條消息");System.out.println("異常前");int a=9/0;//模擬發(fā)送異常transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二條消息");return "trans";}
這個(gè)時(shí)候發(fā)送了異常,隊(duì)列中也是一條消息都沒有的。
(6)事務(wù)小結(jié)
要完成一個(gè)事務(wù)的操作,這三個(gè)操作都不能少
配置對(duì)象和事務(wù)管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
加上@Transactional注解:
@Transactional
@RequestMapping("/trans")
public String trans() {transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一條消息");//int a=9/0;//模擬發(fā)送異常transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二條消息");return "trans";
}
還有一個(gè)注意事項(xiàng),使用事務(wù),最好把消息發(fā)送確認(rèn)模式關(guān)閉
4.2.消息分發(fā)
(1)定義
多個(gè)消費(fèi)者訂閱同一個(gè)隊(duì)列時(shí),隊(duì)列會(huì)輪詢給消費(fèi)者分發(fā)消息,每個(gè)消費(fèi)者平均每分鐘拿到的消息數(shù)目是一樣的,這種情況看似挺好的,但是容易出現(xiàn)問題。
當(dāng)每個(gè)消費(fèi)者的消費(fèi)能力不一樣時(shí),消費(fèi)速度慢的,消息就會(huì)積壓;而消費(fèi)速度快的消費(fèi)者,就會(huì)空閑,進(jìn)而影響整體的吞吐量。
所以就有了消息分發(fā),按照一定的規(guī)則,平均每分鐘給不同的消費(fèi)者分發(fā)不同數(shù)量的消息。
對(duì)于消息分發(fā),有兩個(gè)應(yīng)用場(chǎng)景 -- 限流和非公平分發(fā)
(2)限流
消費(fèi)者每次只能拿到一定數(shù)量的消息,只有消費(fèi)并且確認(rèn)后,才能繼續(xù)拿到消息。所以需要配置成手動(dòng)確認(rèn)模式和限流參數(shù)
1)配置
2)相應(yīng)代碼
交換機(jī)和隊(duì)列:
@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constant.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).delayed().build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue delayQueue, @Qualifier("qosExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("qos").noargs();}}
生產(chǎn)者:
@RequestMapping("/qos")
public String Qos() {for(int i=0;i<20;i++) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","a qos test"+i);}return "qos";
}
消費(fèi)者:
@Component
public class QosListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void qosListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費(fèi):"+new String(message.getBody()));//channel.basicAck(deliveryTag, false); //不進(jìn)行消息確認(rèn)}catch (Exception e){channel.basicNack(deliveryTag, false, false);}}
}
3)演示
一下子往隊(duì)列中發(fā)送20條消息,但是消費(fèi)者一下子只能拿到5條消息
但是沒有確認(rèn),就只有五條消息,也拿不到后續(xù)的消息。
(3)負(fù)載均衡
模擬實(shí)現(xiàn)負(fù)載均衡,可以把限流參數(shù)修改成1,消費(fèi)確認(rèn)完成一條消息才能繼續(xù)拿。
后續(xù)代碼跟上述是差不多了