電子商務(wù)網(wǎng)站服務(wù)器上海網(wǎng)絡(luò)營(yíng)銷公司
使用 Redis List 和 Pub/Sub 實(shí)現(xiàn)簡(jiǎn)單的消息隊(duì)列
Redis 本身不是專門的消息隊(duì)列系統(tǒng),但它提供了多種數(shù)據(jù)結(jié)構(gòu)(如 List、Pub/Sub、Stream)來(lái)實(shí)現(xiàn)消息隊(duì)列功能。根據(jù)不同的業(yè)務(wù)需求,可以選擇不同的方式:
在 Redis 中,可以使用 List 或 Pub/Sub 模塊實(shí)現(xiàn)簡(jiǎn)單的消息隊(duì)列。兩者的適用場(chǎng)景和實(shí)現(xiàn)方式有所不同:
- List(列表):適用于任務(wù)隊(duì)列(Task Queue),支持持久化存儲(chǔ),消費(fèi)者可以消費(fèi)歷史消息,支持 多消費(fèi)者競(jìng)爭(zhēng)消費(fèi)(類似于 Kafka)。
- Pub/Sub(發(fā)布/訂閱):適用于實(shí)時(shí)推送(Event Notification),不存儲(chǔ)消息,消費(fèi)者只能接收發(fā)布時(shí)刻的消息,適合 多消費(fèi)者廣播消費(fèi)(類似于 RabbitMQ Fanout)。
方式一:使用 Redis List 實(shí)現(xiàn)簡(jiǎn)單的消息隊(duì)列
Redis 的 LPUSH
和 BRPOP
操作可以用來(lái)構(gòu)建一個(gè) 基于拉取的消息隊(duì)列。
1. 生產(chǎn)者(Producer)
生產(chǎn)者將消息推送到 Redis List 的尾部:
LPUSH my_queue "message1"
LPUSH my_queue "message2"
或在 Python 中:
import redisr = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.lpush('my_queue', 'message1')
r.lpush('my_queue', 'message2')
LPUSH my_queue "message"
:將新消息插入隊(duì)列的 左側(cè)(頭部)。RPUSH my_queue "message"
也可以使用,它會(huì)將消息插入隊(duì)列的 右側(cè)(尾部)。
2. 消費(fèi)者(Consumer)
消費(fèi)者使用 BRPOP
(阻塞)或 RPOP
(非阻塞)從隊(duì)列的 右側(cè) 彈出消息:
BRPOP my_queue 0
或在 Python 中:
while True:msg = r.brpop('my_queue', timeout=0) # 阻塞模式if msg:print("Received:", msg[1]) # msg[1] 是消息內(nèi)容
BRPOP my_queue 0
:如果隊(duì)列為空,則 阻塞 等待新的消息。RPOP my_queue
:如果隊(duì)列為空,直接返回None
,不會(huì)阻塞。
3. 多消費(fèi)者
多個(gè)消費(fèi)者可以競(jìng)爭(zhēng)消費(fèi)消息,即每條消息只會(huì)被 其中一個(gè) 消費(fèi)者消費(fèi)。例如,有兩個(gè)消費(fèi)者在 BRPOP
同一個(gè)隊(duì)列,Redis 只會(huì)把某個(gè)消息分配給其中一個(gè)。
4. 消息確認(rèn)與持久化
由于 Redis List 只存儲(chǔ)消息,不支持自動(dòng)重試,因此可以配合 LPUSH+RPOPLPUSH 事務(wù) 實(shí)現(xiàn)持久化:
- 先用
RPOPLPUSH my_queue processing_queue
把消息從my_queue
轉(zhuǎn)移到processing_queue
,然后再處理。 - 處理完成后,從
processing_queue
中刪除該消息。
msg = r.rpoplpush('my_queue', 'processing_queue') # 轉(zhuǎn)移到處理中隊(duì)列
if msg:process_message(msg) # 處理消息r.lrem('processing_queue', 1, msg) # 處理完成后刪除
方式二:使用 Redis Pub/Sub 實(shí)現(xiàn)消息隊(duì)列
Pub/Sub 適用于實(shí)時(shí)消息推送,消息不會(huì)存儲(chǔ),適合事件廣播。
1. 生產(chǎn)者(Publisher)
發(fā)布者向某個(gè)頻道(channel)發(fā)送消息:
PUBLISH my_channel "message1"
或在 Python 中:
r.publish('my_channel', 'message1')
2. 消費(fèi)者(Subscriber)
訂閱者監(jiān)聽消息:
SUBSCRIBE my_channel
或在 Python:
pubsub = r.pubsub()
pubsub.subscribe('my_channel')for message in pubsub.listen():if message['type'] == 'message':print("Received:", message['data'].decode())
3. Pub/Sub 適用場(chǎng)景
- 實(shí)時(shí)消息推送(如 WebSocket、聊天室)。
- 事件驅(qū)動(dòng)系統(tǒng)(如日志收集、狀態(tài)變更通知)。
- 多消費(fèi)者廣播消費(fèi),所有訂閱者都會(huì)收到相同的消息。
4. Pub/Sub 局限性
- 消息 不會(huì)持久化,如果訂閱者掉線,它不會(huì)收到丟失的消息。
- 不能確保 消息按順序消費(fèi)。
- 無(wú)法回溯歷史消息(相比 Kafka)。
總結(jié):Redis List vs Pub/Sub
特性 | Redis List | Redis Pub/Sub |
---|---|---|
消息存儲(chǔ) | 存儲(chǔ)在 List,直到被消費(fèi) | 不存儲(chǔ),實(shí)時(shí)傳輸 |
消費(fèi)者模型 | 多消費(fèi)者競(jìng)爭(zhēng)消費(fèi)(類似任務(wù)隊(duì)列) | 多消費(fèi)者廣播消費(fèi)(類似事件通知) |
可靠性 | 支持重試和確認(rèn)機(jī)制 | 訂閱者掉線會(huì)丟失消息 |
適用場(chǎng)景 | 任務(wù)隊(duì)列(如延遲任務(wù)、任務(wù)分發(fā)) | 實(shí)時(shí)推送(如聊天、事件通知) |
如果需要 持久化隊(duì)列,建議使用 Redis List;如果只是 實(shí)時(shí)推送,可以用 Pub/Sub。