小組做數(shù)據(jù)庫網(wǎng)站成都網(wǎng)站快速排名
1.初識MQ
1.1.同步和異步通訊
微服務(wù)間通訊有同步和異步兩種方式:
同步通訊:就像打電話,需要實時響應(yīng)。
異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。
兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個人同時通話。發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應(yīng)會有延遲。
1.1.1.同步通訊
我們之前學(xué)習(xí)的Feign調(diào)用就屬于同步方式,雖然調(diào)用可以實時得到結(jié)果,但存在下面的問題:
總結(jié):
同步調(diào)用的優(yōu)點:
- 時效性較強,可以立即得到結(jié)果
同步調(diào)用的問題:
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級聯(lián)失敗問題
1.1.2.異步通訊
異步調(diào)用則可以避免上述問題:
我們以購買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉庫分配響應(yīng)的庫存并準(zhǔn)備發(fā)貨。
在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個支付成功的事件(event),事件中帶上訂單id。
訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽到事件后完成自己業(yè)務(wù)即可。
為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰來訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰發(fā)來的消息。
Broker 是一個像數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個總線上,這個總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。
好處:
-
吞吐量提升:無需等待訂閱者處理完成,響應(yīng)更快速
-
故障隔離:服務(wù)沒有直接調(diào)用,不存在級聯(lián)失敗問題
-
調(diào)用間沒有阻塞,不會造成無效的資源占用
-
耦合度極低,每個服務(wù)都可以靈活插拔,可替換
-
流量削峰:不管發(fā)布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件
缺點:
- 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好管理
- 需要依賴于Broker的可靠、安全、性能
1.2.技術(shù)對比:
MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅(qū)動架構(gòu)中的Broker。
比較常見的MQ實現(xiàn):
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
2.快速入門
2.1.安裝RabbitMQ
安裝RabbitMQ,參考:RabbitMQ部署指南
MQ的基本結(jié)構(gòu):
RabbitMQ中的一些角色:
- publisher:生產(chǎn)者
- consumer:消費者
- exchange個:交換機,負責(zé)消息路由
- queue:隊列,存儲消息
- virtualHost:虛擬主機,隔離不同租戶的exchange、queue、消息的隔離
2.2.RabbitMQ消息模型
RabbitMQ官方提供了5個不同的Demo示例,對應(yīng)了不同的消息模型:
2.3.入門案例
簡單隊列模式的模型圖:
官方的HelloWorld是基于最基礎(chǔ)的消息隊列模型來實現(xiàn)的,只包括三個角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
- queue:消息隊列,負責(zé)接受并緩存消息
- consumer:訂閱隊列,處理隊列中的消息
2.3.1.publisher實現(xiàn)
思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 發(fā)送消息
- 關(guān)閉連接和channel
代碼實現(xiàn):
package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設(shè)置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 3.創(chuàng)建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.發(fā)送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發(fā)送消息成功:【" + message + "】");// 5.關(guān)閉通道和連接channel.close();connection.close();}
}
2.3.2.consumer實現(xiàn)
代碼思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 訂閱消息
代碼實現(xiàn):
package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設(shè)置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 3.創(chuàng)建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.訂閱消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.處理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
2.4.總結(jié)
基本消息隊列的消息發(fā)送流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
利用channel向隊列發(fā)送消息
基本消息隊列的消息接收流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
定義consumer的消費行為handleDelivery()
-
利用channel將消費者與隊列綁定
3.SpringAMQP
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機及其綁定關(guān)系
- 基于注解的監(jiān)聽器模式,異步接收消息
- 封裝了RabbitTemplate工具,用于發(fā)送消息
3.1.Basic Queue 簡單隊列模型
在父工程mq-demo中引入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.1.1.消息發(fā)送
首先配置MQ地址,在publisher服務(wù)的application.yml中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: root # 用戶名password: 123456 # 密碼
然后在publisher服務(wù)中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現(xiàn)消息發(fā)送:
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message);}
}
3.1.2.消息接收
首先配置MQ地址,在consumer服務(wù)的application.yml中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: root # 用戶名password: 123456 # 密碼
然后在consumer服務(wù)的cn.itcast.mq.listener
包中新建一個類SpringRabbitListener,代碼如下:
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消費者接收到消息:【" + msg + "】");}
}
3.1.3.測試
啟動consumer服務(wù),然后在publisher服務(wù)中運行測試代碼,發(fā)送MQ消息
3.2.WorkQueue
Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當(dāng)消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。
3.2.1.消息發(fā)送
這次我們循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個測試方法:
/*** workQueue* 向隊列中不停發(fā)送消息,模擬消息堆積。*/
@Test
public void testWorkQueue() throws InterruptedException {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
3.2.2.消息接收
要模擬多個消費者綁定同一個隊列,我們在consumer服務(wù)的SpringRabbitListener中添加2個新的方法:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
注意到這個消費者sleep了1000秒,模擬任務(wù)耗時。
3.2.3.測試
啟動ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測試方法testWorkQueue。
可以看到消費者1很快完成了自己的25條消息。消費者2卻在緩慢的處理自己的25條消息。
也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。這樣顯然是有問題的。
3.2.4.能者多勞
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務(wù)的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
3.2.5.總結(jié)
Work模型的使用:
- 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
- 通過設(shè)置prefetch來控制消費者預(yù)取的消息數(shù)量
3.3.發(fā)布/訂閱
發(fā)布訂閱的模型如圖:
可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:
- Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X(交換機)
- Exchange:交換機,圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列
- Direct:定向,把消息交給符合指定routing key 的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。
Exchange(交換機)只負責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
3.4.Fanout
Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。
在廣播模式下,消息發(fā)送流程是這樣的:
- 1) 可以有多個隊列
- 2) 每個隊列都要綁定到Exchange(交換機)
- 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機,交換機來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定
- 4) 交換機把消息發(fā)送給綁定過的所有隊列
- 5) 訂閱隊列的消費者都能拿到消息
3.5.Direct
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
3.5.1.總結(jié)
Direct交換機與Fanout交換機的差異:
- Fanout交換機將消息路由給每一個與之綁定的隊列
- Direct交換機根據(jù)RoutingKey判斷路由給哪個隊列
- 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似
基于@RabbitListener注解聲明隊列和交換機有哪些常見注解?
- @Queue
- @Exchange
3.6.Topic
3.6.1.說明
Topic
類型的Exchange
與Direct
相比,都是可以根據(jù)RoutingKey
把消息路由到不同的隊列。只不過Topic
類型Exchange
可以讓隊列在綁定Routing key
的時候使用通配符!
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: it.insert
通配符規(guī)則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
舉例:
it.#
:能夠匹配it.spu.insert
或者 it.spu
it.*
:只能匹配it.spu
?
圖示:
解釋:
- Queue1:綁定的是
china.#
,因此凡是以china.
開頭的routing key
都會被匹配到。包括china.news和china.weather - Queue2:綁定的是
#.news
,因此凡是以.news
結(jié)尾的routing key
都會被匹配。包括china.news和japan.news
3.6.2.總結(jié)
描述下Direct交換機與Topic交換機的差異?
- Topic交換機接收的消息RoutingKey必須是多個單詞,以
**.**
分割 - Topic交換機與隊列綁定時的bindingKey可以指定通配符
#
:代表0個或多個詞*
:代表1個詞
3.7.消息轉(zhuǎn)換器
之前說過,Spring會把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時候,還會把字節(jié)反序列化為Java對象。
只不過,默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
- 數(shù)據(jù)體積過大
- 有安全漏洞
- 可讀性差
我們來測試一下。
3.7.1.測試默認(rèn)轉(zhuǎn)換器
我們修改消息發(fā)送的代碼,發(fā)送一個Map對象:
@Test
public void testSendMap() throws InterruptedException {// 準(zhǔn)備消息Map<String,Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 21);// 發(fā)送消息rabbitTemplate.convertAndSend("simple.queue","", msg);
}
停止consumer服務(wù)
發(fā)送消息后查看控制臺:
3.7.2.配置JSON轉(zhuǎn)換器
顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher和consumer兩個服務(wù)中都引入依賴:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
配置消息轉(zhuǎn)換器。
在啟動類中添加一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}