国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

品牌設(shè)計網(wǎng)站怎樣推廣自己的廣告

品牌設(shè)計網(wǎng)站,怎樣推廣自己的廣告,一個網(wǎng)站的制作流程,wordpress誤刪的后果一、概述 消息隊列 定義 消息隊列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊列、生產(chǎn)者和消費者組成消息隊列:負責(zé)存儲和管理消息的中間件,也稱為消息代理(Message Broker)生產(chǎn)者:負責(zé) 產(chǎn)…

一、概述

消息隊列

  1. 定義
    1. 消息隊列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊列、生產(chǎn)者和消費者組成
    2. 消息隊列:負責(zé)存儲和管理消息的中間件,也稱為消息代理(Message Broker)
    3. 生產(chǎn)者:負責(zé) 產(chǎn)生并發(fā)送 消息到隊列的應(yīng)用程序
    4. 消費者:負責(zé)從隊列 獲取并處理 消息的應(yīng)用程序
  2. 功能:實現(xiàn)消息發(fā)送和處理的解耦,支持異步通信,提高系統(tǒng)的可擴展性和可靠性
  3. 主流消息隊列解決方案
    1. RabbitMQ:輕量級,支持多種協(xié)議,適合中小規(guī)模應(yīng)用
    2. RocketMQ:阿里開源,高性能,適合大規(guī)模分布式應(yīng)用

Stream

  1. 定義:Stream:Redis 5.0 引入的一種數(shù)據(jù)類型,用于處理高吞吐量的消息流、事件流等場景
  2. 功能:按時間順序 ”添加、讀取、消費“ 消息,支持消費者組、消息確認等功能

二、Stream 工作流程

  1. 寫入消息
    1. 生產(chǎn)者通過?XADD?向 Stream 中添加消息。每條消息自動獲得唯一的 ID,按時間順序存入 Stream。
  2. 創(chuàng)建消費者組
    1. 如果使用消費者組,首先需要通過?XGROUP CREATE?創(chuàng)建消費者組。
    2. 消費者組會根據(jù)時間順序?qū)⑾⒎峙浣o組內(nèi)的消費者。
  3. 讀取消息
    1. 消費者使用 XREADGROUP 命令讀取 Stream 中的消息。
    2. 消息按規(guī)則分配給不同消費者處理,每個消費者讀取到不同的消息。
  4. 確認消息
    1. 消費者在處理完消息后,使用?XACK?命令確認消息,表示該消息已成功處理。
    2. 如果消息未確認(例如消費者崩潰或超時),它將保持在?Pending?狀態(tài),等待重新分配給其他消費者。
  5. 重新分配未確認消息
    1. 如果消息在一定時間內(nèi)沒有被確認,其他消費者可以讀取未確認的消息并進行處理。
    2. 可通過 XPENDING 命令查看未確認消息,或在消費者組中設(shè)置時間閾值自動重新分配。
  6. 刪除消費者組
    1. 不再需要消費者組時,使用 XGROUP DESTROY 命令刪除消費者組

三、Stream 實現(xiàn)

消費者組模式

  1. 定義:Redis Streams 的一部分,用于處理消息的分布式消費
  2. 優(yōu)點
    1. 消息分流:多消費者爭搶消息,加快消費速度,避免消息堆積
    2. 消息標(biāo)示:避免消息漏讀,消費者讀取消息后不馬上銷毀,加入 consumerGroup 維護的 pending list 隊列等待 ACK
    3. 消息確認:通過消息 ACK 機制,保證消息至少被消費一次
    4. 可以阻塞讀取,避免盲等
  3. 實現(xiàn)方法 :通過 Stream 數(shù)據(jù)類型實現(xiàn)消息隊列,命令以 “X” 開頭

常用命令

XGROUP CREATE key groupName ID [MKSTREAM]

  1. 功能:創(chuàng)建消費者組
  2. 參數(shù)
    1. key:隊列名稱
    2. groupName:組名稱
    3. ID:起始 ID 標(biāo)識,$ 表示隊列中最后一個消息,0 表示隊列中第一個消息
    4. MKSTREAM:隊列不存在則創(chuàng)建隊列

XGROUP DESTORY key groupName

  1. 功能:刪除指定消費者組

XGROUP CREATECONSUMER key groupName consumerName

  1. 功能:添加組中消費者

XGROUP DELCONSUMER key groupName consumerName

  1. 功能:刪除組中消費者

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  1. 功能:讀取組中的消息
  2. gourp:消費者組名稱
  3. consumer:消費者名稱(不存在則自動創(chuàng)建)
  4. count:本次查詢的最大數(shù)量
  5. BLOCK milliseconds:當(dāng)沒有消息時最長等待時間
  6. NOACK:無需手動 ACK,獲取到消息后自動確認
  7. STREAMS KEY:指定隊列名稱
  8. ID:獲取消息的起始 ID,> 表示從下一個未消費消息開始 (常用)

XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]

  1. 功能:獲取 pending-list 中的消息
  2. IDLE:獲取消息后、確認消息前的這段時間,空閑時間超過 min-idle-time 則取出
  3. start:獲取的最小目標(biāo) ID
  4. end:獲取的最大目標(biāo) ID
  5. count:獲取的數(shù)量
  6. consumer:獲取 consumer 的 pending-list

XACK key group ID [ ID … ]

  1. 功能:確認從組中讀取的消息已被處理
  2. key:隊列名稱
  3. group:組名稱
  4. ID:消息的 ID

表格版命令

  1. 命令

    命令功能
    XGROUP CREATE key groupName ID [MKSTREAM]創(chuàng)建消費者組
    XGROUP DESTORY key groupName刪除指定消費者組
    XGROUP CREATECONSUMER key groupName consumerName添加組中消費者
    XGROUP DELCONSUMER key groupName consumerName刪除組中消費者
    XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]讀取組中的消息,ID 填寫 “>” 則讀取第一條未讀消息
    XACK key group ID [ ID … ]確認從組中讀取的消息已被處理
  2. 屬性

    屬性名定義
    key隊列名稱
    groupName消費者組名稱
    ID起始 ID 標(biāo)示,$ 代表隊列中最后一個消息,0 代表第一個消息
    MKSTREAM隊列不存在時自動創(chuàng)建隊列
    BLOCK milliseconds沒有消息時的最大等待時長
    NOACK無需手動 ACK,獲取到消息后自動確認
    STREAMS key指定隊列名稱

運行邏輯

while(true) {// 嘗試監(jiān)聽隊列,使用阻塞模式,最長等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 處理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新讀取阻塞隊列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null)               // 如果阻塞隊中的消息已經(jīng)全部處理則退出pending-listbreak;try {handleMessage(msg);    			// 重新處理 pending-list 中的消息} catch (Exception e){continue;                   // 如果還出錯, 則繼續(xù)重新讀取}}}
}

四、示例

  1. 目標(biāo):消息隊列實現(xiàn)數(shù)據(jù)庫異步修改數(shù)據(jù)庫,將下單 message 緩存在 redis 中,減小下單操作對數(shù)據(jù)庫的沖擊

  2. 項目結(jié)構(gòu)

    1. RedisConfig?配置類:創(chuàng)建消費者組是一次性的操作,適合放在配置類中
    2. VoucherOrderHandler?內(nèi)部類:消費者的邏輯和訂單業(yè)務(wù)相關(guān),因此適合放在?VoucherOrderServiceImpl?中
    3. 多線程啟動邏輯:消費者線程的啟動與訂單業(yè)務(wù)密切相關(guān),直接放在?VoucherOrderServiceImpl?類中更符合職責(zé)分離原則
    src/main/java
    ├── com/example
    │   ├── config
    │   │   └── RedisConfig.java                  // Redis 配置類,包含消費者組初始化
    │   ├── service
    │   │   ├── VoucherOrderService.java
    │   │   └── impl
    │   │       └── VoucherOrderServiceImpl.java  // 包含 VoucherOrderHandler 內(nèi)部類
    │   ├── entity
    │   │   └── VoucherOrder.java                 // 優(yōu)惠券訂單實體
    │   ├── utils
    │   │   └── BeanUtil.java                     // 用于 Map 轉(zhuǎn) Bean 的工具類
    │   └── controller
    │       └── VoucherOrderController.java       // 如果有 Controller
    
  3. 創(chuàng)建消費者組(config.RedisConfig)

    @Bean
    public void initStreamGroup() {// 檢查是否存在消費者組 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,拋出異常,可忽略log.warn("消費者組 g1 已存在");}
    }
    
  4. 創(chuàng)建消費者線程

    1. 位置:作為 VoucherOrderServiceImpl 內(nèi)的預(yù)構(gòu)造部分
    @PostConstruct
    public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 個線程,模擬多個消費者new Thread(new VoucherOrderHandler()).start();}
    }
    
  5. 添加消息到消息隊列 (src/main/resources/lua/SECKILL_SCRIPT.lua)

    --1. 參數(shù)列表
    --1.1. 優(yōu)惠券id
    local voucherId = ARGV[1]
    --1.2. 用戶id
    local userId = ARGV[2]
    --1.3. 訂單id
    local orderId = ARGV[3]--2. 數(shù)據(jù)key
    local stockKey = 'seckill:stock:' .. voucherId          --2.1. 庫存key
    local orderKey = 'seckill:order' .. voucherId           --2.2. 訂單key--3. 腳本業(yè)務(wù)
    --3.1. 判斷庫存是否充足 get stockKey
    if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1
    end
    --3.2. 判斷用戶是否重復(fù)下單 SISMEMBER orderKey userId
    if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2
    end
    --3.4 扣庫存 incrby stockKey -1
    redis.call( 'INCRBY', stockKey, -1 )
    --3.5 下單(保存用戶) sadd orderKey userId
    redis.call( 'SADD', orderKey, userId )
    -- 3.6. 發(fā)送消息到隊列中
    redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
    
  6. 創(chuàng)建消費者類(ServiceImpl)

    1. 位置:作為 VoucherOrderServiceImpl 內(nèi)的私有類
    // 在ServiceImpl中創(chuàng)建一個VoucherOrderHandler消費者類,專門用于處理消息隊列中的消息
    private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 獲取消息隊列中的訂單信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 沒有消息則重新監(jiān)聽if (list == null || list.isEmpty() ) continue;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認當(dāng)前消息已消費 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("處理訂單異常", e);// 6. 處理訂單失敗則消息會加入pending-list,繼續(xù)處理pending-listhandlePendingList();}}}// 處理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消費pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),                                   // 消費者此消息的消費者StreamReadOptions.empty().count(1),                          // StreamOffset.create("stream.order", ReadOffset.from("0"))     // 從pending-list的第一條消息開始讀);// 2. 退出條件, list 為空 -> pending-list 已全部處理if(list == null || list.isEmpty()) break;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認消息已消費(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20);     // 如果發(fā)生異常則休眠一會再重新消費pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}}
    }
    
  7. 創(chuàng)建消息方法

    1. 目標(biāo):用戶通過這個方法發(fā)送一條創(chuàng)建訂單的 Message 給 Redis Stream
    // 創(chuàng)建Lua腳本對象
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua腳本初始化 (通過靜態(tài)代碼塊)
    static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class);
    }@Override
    public void createVoucherOrder(Long voucherId, Long userId) {// 生成訂單 ID(模擬)long orderId = System.currentTimeMillis();// 執(zhí)行 Lua 腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),                    // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根據(jù) Lua 腳本返回結(jié)果處理if (result == 1) {throw new RuntimeException("庫存不足!");} else if (result == 2) {throw new RuntimeException("不能重復(fù)下單!");}// 如果腳本執(zhí)行成功,則訂單消息會進入 Redis Stream,消費者組會自動處理System.out.println("訂單創(chuàng)建成功!");
    }
    

(缺陷) 單消費者模式

  1. 常用命令
    1. XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
    2. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
  2. 缺陷:有消息漏讀風(fēng)險

五、其他消息隊列方案

(缺陷) List 實現(xiàn)

  1. 優(yōu)點
    1. 不受 JVM 內(nèi)存上限限制:因為利用 Redis 存儲
    2. 數(shù)據(jù)安全 :因為基于 List 結(jié)構(gòu)本身是數(shù)據(jù)存儲,基于 Redis 持久化機制
    3. 消息有序性:通過 List 結(jié)構(gòu)的 LPUSH & BRPOP 命令實現(xiàn)順序
  2. 缺點
    1. 消息丟失:BRPOP 的時候如果宕機則消息會丟失
    2. 只支持單消費者

(缺陷) PubSub 實現(xiàn)

  1. 定義
    1. Publish & Subscribe 模型,一種消息隊列模型
    2. 生產(chǎn)者向指定的 channel 來 public 消息
    3. 消費者從 subscribe 的 channel 中接收消息
  2. 功能:支持多消費者模式,多個消費者可以同時 subscribe 一個 channel
  3. 優(yōu)點:采用發(fā)布訂閱模型,支持多生產(chǎn)者、消費者
  4. 缺點
    1. 不支持數(shù)據(jù)持久化
    2. 無法避免消息丟失
    3. 消息堆積有上限,超出時數(shù)據(jù)丟失

三種消息隊列對比


http://aloenet.com.cn/news/34438.html

相關(guān)文章:

  • 網(wǎng)站策劃書最后一步怎么做采集站seo提高收錄
  • 寧夏銀川網(wǎng)站建設(shè)游戲app拉新平臺
  • 怎么做網(wǎng)站賺錢廣告營銷案例分析
  • wordpress openbox主題山東服務(wù)好的seo
  • 做彩票網(wǎng)站要什么接口互聯(lián)網(wǎng)推廣與營銷
  • 平面設(shè)計專用網(wǎng)站臨安網(wǎng)站seo
  • 內(nèi)力網(wǎng)站建設(shè)公司宣傳軟文
  • 做網(wǎng)站頁面的軟件海淀區(qū)seo搜索引擎
  • 網(wǎng)站建設(shè)常用英語網(wǎng)店運營
  • 中山做外貿(mào)網(wǎng)站建設(shè)百度小說排行榜完本
  • 做誘惑類cpa網(wǎng)站經(jīng)驗百度賬號注冊平臺
  • xp做的網(wǎng)站有連接限制seo優(yōu)化網(wǎng)站技術(shù)排名百度推廣
  • 沒有注冊公司怎么做網(wǎng)站性價比高seo排名
  • 無錫公司網(wǎng)站建設(shè)電話百度做網(wǎng)站需要多少錢
  • 濰坊市網(wǎng)站建設(shè)公司西部數(shù)碼域名注冊官網(wǎng)
  • 網(wǎng)站優(yōu)化推廣怎么做電商營銷策劃方案
  • 做公司網(wǎng)站需要有座機嗎微信crm客戶管理系統(tǒng)
  • 企業(yè)網(wǎng)站建設(shè)顧問百度推廣后臺登陸官網(wǎng)
  • 網(wǎng)站用什么語言開發(fā)百度搜索怎么優(yōu)化
  • 淮南北京網(wǎng)站建設(shè)新網(wǎng)站如何推廣
  • wap網(wǎng)站為什么沒有了沈陽網(wǎng)絡(luò)seo公司
  • 公司網(wǎng)站打不開網(wǎng)頁搜索引擎大全
  • 濟源城鄉(xiāng)建設(shè)局網(wǎng)站營銷推廣技巧
  • 有哪些做外貿(mào)免費的網(wǎng)站深圳網(wǎng)站設(shè)計專家樂云seo
  • 哪些網(wǎng)站專門做康復(fù)科seo網(wǎng)站優(yōu)化排名
  • 珍島網(wǎng)站模板最近社會熱點新聞事件
  • 重慶知名網(wǎng)站制作公司seo推廣方法
  • 音樂網(wǎng)站的設(shè)計與開發(fā)可以免費網(wǎng)絡(luò)推廣網(wǎng)站
  • 濟南網(wǎng)站建設(shè)優(yōu)化站長申論
  • java配合什么做網(wǎng)站獨立站seo外鏈平臺