怎么給網(wǎng)站做php后臺百度文庫賬號登錄入口
目錄
- 一、簡介
- 1.1、消費模式
- 二、消費者
- 2.1、maven依賴
- 2.2、application配置
- 2.3、消費監(jiān)聽
- 三、生產(chǎn)者
- 3.1、發(fā)送消息
- 3.2、運行結果
- 四、其他
一、簡介
??在之前的文章中,我們講過了,同步發(fā)送單條消息,異步發(fā)送單條消息,發(fā)送單向消息,發(fā)送順序消息,批量發(fā)送消息,事務消息,我們使用的模式都是 集群消費模式(Cluster),本文就來講另外一種消息消費模式,也就是廣播消費模式(Broadcast)
1.1、消費模式
??在 Apache RocketMQ 中,實現(xiàn)消息消費的方式主要是兩種:
-
集群消費模式(Cluster):
在集群消費模式下,同一個消費者組(Consumer Group)中的每個消費者都會消費消息的一個副本。消息會被分發(fā)到不同的消費者實例上,但是同一個消息只會被同一個消費者組中的一個消費者消費。 -
廣播消費模式(Broadcast):
在廣播消費模式下,同一個消費者組中的每個消費者都會收到消息的一個副本,即每個消費者都會獨立地消費消息。消息會被廣播到同一個消費者組中的所有消費者實例上。
??那么怎么使用廣播消費模式呢?其實很簡單,通過在消費者的 @RocketMQMessageListener 注解中設置 messageModel 參數(shù)為 MessageModel.BROADCASTING,即可將消費者設置為廣播模式。在廣播模式下,同一個消費者組中的每個消費者都會收到消息的一個副本,每個消費者都會獨立地消費消息,從而實現(xiàn)了消息的廣播消費。接下里看看具體操作吧。
二、消費者
2.1、maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默認的消費者組
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的數(shù)量
rocketmq.consumer.pull-batch-size=10
# 廣播消費模式
rocketmq.consumer.message-model=BROADCASTING
??實際上對于本文來說,下面兩個配置不用配置,也不會生效。
# 默認的消費者組
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 廣播消費模式
rocketmq.consumer.message-model=BROADCASTING
??因為優(yōu)先的是@RocketMQMessageListener 注解中設置 consumerGroup 和messageModel 參數(shù)。
2.3、消費監(jiān)聽
??@RocketMQMessageListener是RocketMQ提供的注解,用于配置消費者監(jiān)聽器的相關屬性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一個消費者接收到的字符串消息: {}", message);// 處理消息的業(yè)務邏輯}
}
??關于這里@RocketMQMessageListener的參數(shù)做個簡單解釋:
- topic:必填,指定該消費者訂閱的Topic名稱
- consumerGroup:必填,指定該消費者所屬的消費者組名稱,同一個組內的消費者實例通常進行負載均衡消費
- messageModel:設置消費模式,取值范圍CLUSTERING(集群消費)、BROADCASTING(廣播消費)
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生產(chǎn)者
??生產(chǎn)者我就復用前面批量消息發(fā)送的模塊了
3.1、發(fā)送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "廣播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量廣播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 設置消息類型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend發(fā)送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息發(fā)送結果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我們先啟動消費者,然后生產(chǎn)者發(fā)送消息。
3.2、運行結果
運行結果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 廣播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一個消費者接收到的字符串消息: 批量廣播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 廣播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二個消費者接收到的字符串消息: 批量廣播消息:5
四、其他
??RocketMQ 通過消費者組(Consumer Group)來維護不同消費者的消費進度。每個消費者組都有一個消費進度(offset),用于標記該組下的消費者在某個主題(Topic)和隊列(Queue)上已經(jīng)消費到的位置。所以:不同的消費者組會被視為不同的消費者;如果消費者重啟或重新加入組,就能從對應Queue的offset處繼續(xù)消費。
??不過使用廣播消費模式時,Consumer Group 的概念基本上沒有作用,因為每個消費者實例都會獨立地收到消息的一個副本。在廣播模式下,同一個消費者組中的每個消費者都會收到消息的一個副本,每個消費者都會獨立地消費消息,而不像集群消費模式中那樣,一個消費者組中的消費者會共同消費消息。
??廣播消費模式在RocketMQ中最好的好處就是消費者解耦:不同的消費者可以獨立消費消息,相互之間不受影響,提高了系統(tǒng)的擴展性,它的適用場景有:
- 日志收集 - 需要將日志數(shù)據(jù)分發(fā)給多個日志收集系統(tǒng),每個系統(tǒng)都需要收到全量日志。
- 數(shù)據(jù)備份 - 實時備份數(shù)據(jù)到多個存儲系統(tǒng),確保數(shù)據(jù)有冗余副本。
- 信息推送 - 向多個推送通道投遞并發(fā)送消息通知,如站內信、短信、Push等。
- 狀態(tài)同步 - 將數(shù)據(jù)變更實時同步到集群的所有節(jié)點,保證集群節(jié)點狀態(tài)一致。
- 負載均衡 - 將任務或請求廣播給所有服務實例,由每個實例獨立處理,實現(xiàn)負載分擔。
- 監(jiān)控告警 - 將系統(tǒng)監(jiān)控數(shù)據(jù)廣播給多個監(jiān)控系統(tǒng),多視角分析。