品牌設(shè)計網(wǎng)站怎樣推廣自己的廣告
一、概述
消息隊列
定義
- 消息隊列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊列、生產(chǎn)者和消費者組成
- 消息隊列:負責(zé)存儲和管理消息的中間件,也稱為消息代理(Message Broker)
- 生產(chǎn)者:負責(zé) 產(chǎn)生并發(fā)送 消息到隊列的應(yīng)用程序
- 消費者:負責(zé)從隊列 獲取并處理 消息的應(yīng)用程序
功能
:實現(xiàn)消息發(fā)送和處理的解耦,支持異步通信,提高系統(tǒng)的可擴展性和可靠性- 主流消息隊列解決方案
- RabbitMQ:輕量級,支持多種協(xié)議,適合中小規(guī)模應(yīng)用
- RocketMQ:阿里開源,高性能,適合大規(guī)模分布式應(yīng)用
Stream
定義
:Stream:Redis 5.0 引入的一種數(shù)據(jù)類型,用于處理高吞吐量的消息流、事件流等場景功能
:按時間順序 ”添加、讀取、消費“ 消息,支持消費者組、消息確認等功能
二、Stream 工作流程
- 寫入消息:
- 生產(chǎn)者通過?
XADD
?向 Stream 中添加消息。每條消息自動獲得唯一的 ID,按時間順序存入 Stream。
- 生產(chǎn)者通過?
- 創(chuàng)建消費者組
- 如果使用消費者組,首先需要通過?
XGROUP CREATE
?創(chuàng)建消費者組。 - 消費者組會根據(jù)時間順序?qū)⑾⒎峙浣o組內(nèi)的消費者。
- 如果使用消費者組,首先需要通過?
- 讀取消息:
- 消費者使用
XREADGROUP
命令讀取 Stream 中的消息。 - 消息按規(guī)則分配給不同消費者處理,每個消費者讀取到不同的消息。
- 消費者使用
- 確認消息:
- 消費者在處理完消息后,使用?
XACK
?命令確認消息,表示該消息已成功處理。 - 如果消息未確認(例如消費者崩潰或超時),它將保持在?Pending?狀態(tài),等待重新分配給其他消費者。
- 消費者在處理完消息后,使用?
- 重新分配未確認消息:
- 如果消息在一定時間內(nèi)沒有被確認,其他消費者可以讀取未確認的消息并進行處理。
- 可通過
XPENDING
命令查看未確認消息,或在消費者組中設(shè)置時間閾值自動重新分配。
- 刪除消費者組:
- 不再需要消費者組時,使用
XGROUP DESTROY
命令刪除消費者組
- 不再需要消費者組時,使用
三、Stream 實現(xiàn)
消費者組模式
定義
:Redis Streams 的一部分,用于處理消息的分布式消費優(yōu)點
- 消息分流:多消費者爭搶消息,加快消費速度,避免消息堆積
- 消息標(biāo)示:避免消息漏讀,消費者讀取消息后不馬上銷毀,加入 consumerGroup 維護的 pending list 隊列等待 ACK
- 消息確認:通過消息 ACK 機制,保證消息至少被消費一次
- 可以阻塞讀取,避免盲等
實現(xiàn)方法
:通過 Stream 數(shù)據(jù)類型實現(xiàn)消息隊列,命令以 “X” 開頭
常用命令
XGROUP CREATE key groupName ID [MKSTREAM]
- 功能:創(chuàng)建消費者組
- 參數(shù)
- key:隊列名稱
- groupName:組名稱
- ID:起始 ID 標(biāo)識,$ 表示隊列中最后一個消息,0 表示隊列中第一個消息
- MKSTREAM:隊列不存在則創(chuàng)建隊列
XGROUP DESTORY key groupName
- 功能:刪除指定消費者組
XGROUP CREATECONSUMER key groupName consumerName
- 功能:添加組中消費者
XGROUP DELCONSUMER key groupName consumerName
- 功能:刪除組中消費者
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
- 功能:讀取組中的消息
- gourp:消費者組名稱
- consumer:消費者名稱(不存在則自動創(chuàng)建)
- count:本次查詢的最大數(shù)量
- BLOCK milliseconds:當(dāng)沒有消息時最長等待時間
- NOACK:無需手動 ACK,獲取到消息后自動確認
- STREAMS KEY:指定隊列名稱
- ID:獲取消息的起始 ID,
>
表示從下一個未消費消息開始 (常用)
XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]
- 功能:獲取 pending-list 中的消息
- IDLE:獲取消息后、確認消息前的這段時間,空閑時間超過 min-idle-time 則取出
- start:獲取的最小目標(biāo) ID
- end:獲取的最大目標(biāo) ID
- count:獲取的數(shù)量
- consumer:獲取 consumer 的 pending-list
XACK key group ID [ ID … ]
- 功能:確認從組中讀取的消息已被處理
- key:隊列名稱
- group:組名稱
- ID:消息的 ID
表格版命令
-
命令
命令 功能 XGROUP CREATE key groupName ID [MKSTREAM] 創(chuàng)建消費者組 XGROUP DESTORY key groupName 刪除指定消費者組 XGROUP CREATECONSUMER key groupName consumerName 添加組中消費者 XGROUP DELCONSUMER key groupName consumerName 刪除組中消費者 XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 讀取組中的消息,ID 填寫 “ >
” 則讀取第一條未讀消息XACK key group ID [ ID … ] 確認從組中讀取的消息已被處理 -
屬性
屬性名 定義 key 隊列名稱 groupName 消費者組名稱 ID 起始 ID 標(biāo)示,$ 代表隊列中最后一個消息,0 代表第一個消息 MKSTREAM 隊列不存在時自動創(chuàng)建隊列 BLOCK milliseconds 沒有消息時的最大等待時長 NOACK 無需手動 ACK,獲取到消息后自動確認 STREAMS key 指定隊列名稱
運行邏輯
while(true) {// 嘗試監(jiān)聽隊列,使用阻塞模式,最長等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 處理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新讀取阻塞隊列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null) // 如果阻塞隊中的消息已經(jīng)全部處理則退出pending-listbreak;try {handleMessage(msg); // 重新處理 pending-list 中的消息} catch (Exception e){continue; // 如果還出錯, 則繼續(xù)重新讀取}}}
}
四、示例
-
目標(biāo):消息隊列實現(xiàn)數(shù)據(jù)庫異步修改數(shù)據(jù)庫,將下單 message 緩存在 redis 中,減小下單操作對數(shù)據(jù)庫的沖擊
-
項目結(jié)構(gòu)
- RedisConfig?配置類:創(chuàng)建消費者組是一次性的操作,適合放在配置類中
- VoucherOrderHandler?內(nèi)部類:消費者的邏輯和訂單業(yè)務(wù)相關(guān),因此適合放在?VoucherOrderServiceImpl?中
- 多線程啟動邏輯:消費者線程的啟動與訂單業(yè)務(wù)密切相關(guān),直接放在?VoucherOrderServiceImpl?類中更符合職責(zé)分離原則
src/main/java ├── com/example │ ├── config │ │ └── RedisConfig.java // Redis 配置類,包含消費者組初始化 │ ├── service │ │ ├── VoucherOrderService.java │ │ └── impl │ │ └── VoucherOrderServiceImpl.java // 包含 VoucherOrderHandler 內(nèi)部類 │ ├── entity │ │ └── VoucherOrder.java // 優(yōu)惠券訂單實體 │ ├── utils │ │ └── BeanUtil.java // 用于 Map 轉(zhuǎn) Bean 的工具類 │ └── controller │ └── VoucherOrderController.java // 如果有 Controller
-
創(chuàng)建消費者組(config.RedisConfig)
@Bean public void initStreamGroup() {// 檢查是否存在消費者組 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,拋出異常,可忽略log.warn("消費者組 g1 已存在");} }
-
創(chuàng)建消費者線程
- 位置:作為 VoucherOrderServiceImpl 內(nèi)的預(yù)構(gòu)造部分
@PostConstruct public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 個線程,模擬多個消費者new Thread(new VoucherOrderHandler()).start();} }
-
添加消息到消息隊列 (src/main/resources/lua/SECKILL_SCRIPT.lua)
--1. 參數(shù)列表 --1.1. 優(yōu)惠券id local voucherId = ARGV[1] --1.2. 用戶id local userId = ARGV[2] --1.3. 訂單id local orderId = ARGV[3]--2. 數(shù)據(jù)key local stockKey = 'seckill:stock:' .. voucherId --2.1. 庫存key local orderKey = 'seckill:order' .. voucherId --2.2. 訂單key--3. 腳本業(yè)務(wù) --3.1. 判斷庫存是否充足 get stockKey if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1 end --3.2. 判斷用戶是否重復(fù)下單 SISMEMBER orderKey userId if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2 end --3.4 扣庫存 incrby stockKey -1 redis.call( 'INCRBY', stockKey, -1 ) --3.5 下單(保存用戶) sadd orderKey userId redis.call( 'SADD', orderKey, userId ) -- 3.6. 發(fā)送消息到隊列中 redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
-
創(chuàng)建消費者類(ServiceImpl)
- 位置:作為 VoucherOrderServiceImpl 內(nèi)的私有類
// 在ServiceImpl中創(chuàng)建一個VoucherOrderHandler消費者類,專門用于處理消息隊列中的消息 private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 獲取消息隊列中的訂單信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 沒有消息則重新監(jiān)聽if (list == null || list.isEmpty() ) continue;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認當(dāng)前消息已消費 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("處理訂單異常", e);// 6. 處理訂單失敗則消息會加入pending-list,繼續(xù)處理pending-listhandlePendingList();}}}// 處理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消費pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // 消費者此消息的消費者StreamReadOptions.empty().count(1), // StreamOffset.create("stream.order", ReadOffset.from("0")) // 從pending-list的第一條消息開始讀);// 2. 退出條件, list 為空 -> pending-list 已全部處理if(list == null || list.isEmpty()) break;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認消息已消費(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20); // 如果發(fā)生異常則休眠一會再重新消費pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}} }
-
創(chuàng)建消息方法
- 目標(biāo):用戶通過這個方法發(fā)送一條創(chuàng)建訂單的 Message 給 Redis Stream
// 創(chuàng)建Lua腳本對象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua腳本初始化 (通過靜態(tài)代碼塊) static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class); }@Override public void createVoucherOrder(Long voucherId, Long userId) {// 生成訂單 ID(模擬)long orderId = System.currentTimeMillis();// 執(zhí)行 Lua 腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根據(jù) Lua 腳本返回結(jié)果處理if (result == 1) {throw new RuntimeException("庫存不足!");} else if (result == 2) {throw new RuntimeException("不能重復(fù)下單!");}// 如果腳本執(zhí)行成功,則訂單消息會進入 Redis Stream,消費者組會自動處理System.out.println("訂單創(chuàng)建成功!"); }
(缺陷) 單消費者模式
常用命令
- XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
缺陷
:有消息漏讀風(fēng)險
五、其他消息隊列方案
(缺陷) List 實現(xiàn)
優(yōu)點
- 不受 JVM 內(nèi)存上限限制:因為利用 Redis 存儲
- 數(shù)據(jù)安全 :因為基于 List 結(jié)構(gòu)本身是數(shù)據(jù)存儲,基于 Redis 持久化機制
- 消息有序性:通過 List 結(jié)構(gòu)的 LPUSH & BRPOP 命令實現(xiàn)順序
缺點
- 消息丟失:BRPOP 的時候如果宕機則消息會丟失
- 只支持單消費者
(缺陷) PubSub 實現(xiàn)
- 定義
- Publish & Subscribe 模型,一種消息隊列模型
- 生產(chǎn)者向指定的 channel 來 public 消息
- 消費者從 subscribe 的 channel 中接收消息
- 功能:支持多消費者模式,多個消費者可以同時 subscribe 一個 channel
- 優(yōu)點:采用發(fā)布訂閱模型,支持多生產(chǎn)者、消費者
- 缺點
- 不支持數(shù)據(jù)持久化
- 無法避免消息丟失
- 消息堆積有上限,超出時數(shù)據(jù)丟失