微信h5免費制作網(wǎng)站seo優(yōu)化與推廣招聘
簡介
MQ(message queue),從字面意思上看就個 FIFO 先入先出的隊列,只不過隊列中存放的內(nèi)容是 message 而已,它是一種具有接收數(shù)據(jù)、存儲數(shù)據(jù)、發(fā)送數(shù)據(jù)等功能的技術(shù)服務(wù)。
作用:流量削峰、應(yīng)用解耦、異步處理。
生產(chǎn)者將消息發(fā)送到消息隊列中,消息隊列負責(zé)轉(zhuǎn)發(fā)消息給消費者,消費者在處理完消息后會對消息隊列進行應(yīng)答,消息隊列收到應(yīng)答信息會將相應(yīng)的消息進行丟棄。
批量應(yīng)答會導(dǎo)致高并發(fā)時消息的丟失,所以盡力以channel.ack()進行手動應(yīng)答。
docker安裝
- 拉取鏡像并后臺運行
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=yi -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq
需要將RABBITMQ_DEFAULT_USER、RABBITMQ_DEFAULT_PASS改成自己的用戶名、密碼。
- 開啟manager插件,可以在網(wǎng)頁進行管理。
docker exec -it 容器id /bin/bash #這里可以用docker ps 查詢剛剛開啟的容器id#進入容器后輸入,開啟rabbitmq-plugins enable rabbitmq_management
可以登錄 http://服務(wù)器IP:15672 訪問web管理界面,訪問成功則代表開啟成功。
JAVA環(huán)境搭建
jar包:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>
Helloworld實例
生產(chǎn)者
public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");//獲取連接Connection connection = connectionFactory.newConnection();//獲取信道,一個連接中有多個信道Channel channel = connection.createChannel();//聲明一個隊列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsAMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";//(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("發(fā)送成功");}
消費者
public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback=(String var1)->{System.out.println("消息消費被中斷");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
工作隊列(任務(wù)隊列)
RabbitMQ默認(rèn)為工作隊列模式,消費者C1,C2為競爭關(guān)系,接收到的消息將輪詢發(fā)送給C1,C2處理,即C1一條C2一條依次循環(huán)。
手動應(yīng)答ack
因為自動應(yīng)答不會考慮消息是否處理成功,所以可能會導(dǎo)致消息丟失,需要在代碼中將自動應(yīng)答改為手動應(yīng)答。批量應(yīng)答在高并發(fā)的時候也容易丟失消息,也應(yīng)該關(guān)閉。
生產(chǎn)者的代碼無需修改。
消費者:
public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("work2 waiting:");DeliverCallback deliverCallback= (String s, Delivery delivery)->{System.out.println(new String(delivery.getBody()));// do something//手動回復(fù)ack,false為關(guān)閉批量應(yīng)答channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(s)->{System.out.println("消息被打斷");};//false表示不自動應(yīng)答ackchannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
不公平分發(fā)
會存在有些線程能力差耗時長,有些能力強耗時短的情況,不公平分發(fā)將實現(xiàn)能者多勞。
設(shè)立channel的basicQos即可實現(xiàn)不公平分發(fā), basicQos的數(shù)值意味著channel的最大存儲上限,channel為1時,消費者最多同時緩存一條待處理消息。
channel.basicQos(1);
發(fā)布確認(rèn)
在開啟隊列持久化、消息持久化后,RabbitMQ服務(wù)器仍然可能在將消息存儲在磁盤前宕機,需要發(fā)布確認(rèn)才能保證消息不丟失,即RabbitMQ在存儲磁盤成功后,發(fā)送確認(rèn)給生產(chǎn)者。
單個發(fā)布確認(rèn)
每條消息存儲在磁盤后進行發(fā)布確認(rèn),只有發(fā)送者在接收到消費者對應(yīng)的發(fā)布確認(rèn)消息后才會給此消費者發(fā)送下一條消息。
public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認(rèn)long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes()); boolean flag = channel.waitForConfirms(); //等待發(fā)布確認(rèn)if(flag){System.out.println("消息發(fā)送成功");}}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
批量發(fā)布確認(rèn)
每發(fā)送100條消息進行一次發(fā)布確認(rèn)。速度快,但是不知道具體是哪一條消息發(fā)送失敗了。
public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認(rèn)long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());if(i%100==0){boolean flag = channel.waitForConfirms(); //等待發(fā)布確認(rèn)if(flag){System.out.println("消息發(fā)送成功");}}}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
異步發(fā)布確認(rèn)
推薦使用,需要加入確認(rèn)發(fā)布監(jiān)聽器confirmListener,并且記錄序列號與消息的關(guān)聯(lián)(ConcurrentSkipListMap)。
public static void publicMsgAsync()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//開啟持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//開啟發(fā)布確認(rèn)// 將序列號與信息相關(guān)聯(lián),ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<Long,String>();//加入確認(rèn)監(jiān)聽器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long msgTag, boolean multiply) throws IOException {System.out.println("消息發(fā)送成功:"+msgTag);if(multiply) { //如果是批量確認(rèn),批量刪除//headMap返回小于msgTag的map視圖ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(msgTag);//清理已經(jīng)標(biāo)記的MapconcurrentNavigableMap.clear();}else {concurrentSkipListMap.remove(msgTag);}}@Overridepublic void handleNack(long msgTag, boolean multiply) throws IOException {System.out.println("未確認(rèn)的消息:"+concurrentSkipListMap.get(msgTag));}});long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());//記錄發(fā)送的信息與其序列號concurrentSkipListMap.put(channel.getNextPublishSeqNo(),new String(i+" "));}long end = System.currentTimeMillis();System.out.println("發(fā)布1000條耗時:"+(end-begin)+"ms");}
發(fā)布/訂閱模式(fanout交換機)
首先要弄明白交換機和隊列的關(guān)系,交換機負責(zé)信息的接收,通過不同的RountingKey將消息轉(zhuǎn)發(fā)到不同的隊列,每個隊列上的接收者都是競爭關(guān)系(即隊列上的消息只會被處理一次),那么當(dāng)一個交換機對應(yīng)多個隊列時,每個隊列僅有一個消費者,這個時候即發(fā)布/訂閱模式,消息會被每個消費者接收。
生產(chǎn)者代碼:向交換機中發(fā)送消息
public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish(EXCHANGE_NAME,"", null,next.getBytes());}}
消費者代碼:聲明匿名隊列,將隊列綁定到交換機上,不同的消費者用相同的RountingKey,以便同時接收到消息。
public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //FANOUT煽出,就是發(fā)布訂閱模式String queue = channel.queueDeclare().getQueue(); //聲明匿名隊列channel.queueBind(queue,EXCHANGE_NAME,""); //將隊列綁定到交換機上,RountingKey為“”DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody()));};channel.basicConsume(queue,true,deliverCallback, (consumerTag)->{});}
Direct交換機
與fanout模式相比,不同的隊列有不同的Rounting key,通過Rounting Key能夠直接向指定隊列發(fā)送消息。
Topic交換機
rountingKey作為匹配串,發(fā)送消息時,匹配上的則能進行發(fā)送。
routingKey必須是單詞列表,用.隔開。如aa.bb.cc
*可以代表一個單詞 ,#可以代表若干個單詞
比如向rountingKey為aa.orange.rabbit發(fā)送消息,Q1和Q2都能接收到消息,而向aa.orange.bb發(fā)送消息則只有Q1能夠接收到消息。
當(dāng)隊列的rountingKey綁定的#,則相當(dāng)于fanout煽出交換機。
當(dāng)隊列的rountingKey綁定不帶#*時,相當(dāng)于direct交換機。
死信隊列
在隊列中1消息超時、2無法處理、3隊列已滿時,消息會被送入死信隊列。