合肥網(wǎng)站建站報廣告代理在線之家
????????
目錄
1、生產(chǎn)者發(fā)送消息的 2 種方式
2、生產(chǎn)者訪問主題???????的 4 種模式
3、消息壓縮
4、消息批量發(fā)送
5、消息分塊發(fā)送
????????生產(chǎn)者(producer)是附加主題(topic)并把消息(messages)發(fā)送到 Pulsar broker 的程序。Pulsar broker 會處理接收到的消息(messages)。
1、生產(chǎn)者???????發(fā)送消息的 2 種方式
????????生產(chǎn)者(Producers)發(fā)送消息(messages)到?brokers 可以是同步的(sync),也可以是異步的(async) // 發(fā)送方式分為同步和異步兩種方式
Mode | Description |
---|---|
Sync send 同步發(fā)送 | The producer waits for an acknowledgement from the broker after sending every message. If the acknowledgment is not received, the producer treats the sending operation as a failure. // 生產(chǎn)者發(fā)送的每一條消息都需要等待broker的確認,如果沒有收到確認,生產(chǎn)者認為此次消息發(fā)送失敗 |
Async send 異步發(fā)送 | The producer puts a message in a blocking queue and returns immediately. The client library sends the message to the broker in the background. If the queue is full (you can?configure?the maximum size), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer. // 生產(chǎn)者將消息放入阻塞隊列并立即返回??蛻舳嗽诤笈_將消息發(fā)送給代理。如果隊列已滿(可以配置大小),則根據(jù)傳遞給生產(chǎn)者的參數(shù),生產(chǎn)者會被阻止或在調(diào)用 API 時立即失敗。 |
2、生產(chǎn)者???????訪問主題???????的 4 種模式
????????對于主題,生產(chǎn)者有以下不同類型的訪問模式:
Access mode | Description |
---|---|
| Multiple producers can publish on a topic. // 默認配置 |
| Only one producer can publish on a topic. // 當(dāng)一個生產(chǎn)者因為網(wǎng)絡(luò)中斷連接,broker 會選擇一個新的 producer 成為專用生產(chǎn)者 |
| Only one producer can publish on a topic. // 如果已經(jīng)有生產(chǎn)者連接,其他生產(chǎn)者將被移除并且立即失效 |
等待獨占 | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the? // 成功獲取獨占訪問權(quán)的生產(chǎn)者將被視為一個leader,因此,如果你想為應(yīng)用程序?qū)崿F(xiàn)一個leader選舉機制,可以選用該訪問模式。需要注意的是,leader機制涉及到Pulsar的WAL日志,leader會把”決策“寫入到主題。錯誤情況,當(dāng)leader寫入消息失敗,會得到broker的通知,通知該生產(chǎn)者將不再是一個leader。 |
注意事項
? ? ? ? ??一旦應(yīng)用程序成功創(chuàng)建了?Exclusive(獨占)?或?WaitForExclusive(等待獨占)訪問模式的生產(chǎn)者,那么此生產(chǎn)者需要保證是主題的唯一訪問者。任何其他試圖訪問此主題的生產(chǎn)者都會立即出錯或者必須等待,直到他們獲得主題的獨占訪問權(quán)。// 獨占模式具有排他性
3、消息壓縮
????????可以壓縮生產(chǎn)者在傳輸過程中發(fā)布的消息。Pulsar 目前支持以下類型的壓縮:
- LZ4:LZ算法系列的一種,號稱是目前最快的壓縮算法之一
- ZLIB:zlib是用于數(shù)據(jù)壓縮的一個簡單的庫,僅支持LZ77的變種算法
- ZSTD:Facebook開源的新無損壓縮算法,優(yōu)點是壓縮率和壓縮/解壓縮性能都很突出
- SNAPPY:提供高速壓縮速度和合理的壓縮率。Snappy?比 zlib 更快,但文件相對要大 20% 到 100%。
????????詳情信息,請點擊這里。
????????壓縮的原理:假如當(dāng)前位置的一個字符串序列,在以前的歷史數(shù)據(jù)中也出現(xiàn)過,那么現(xiàn)在用一種特殊的格式或者特殊的小序列來表示它,就可以起到壓縮的效果,因為特殊格式或者特殊小序列通常都比原本的字符串序列更小。
4、消息批量發(fā)送
????????啟用批量處理后,生產(chǎn)者(producer)在單個請求中累積并發(fā)送一批消息(messages)。批量大小由最大消息數(shù)和最大發(fā)布延遲決定。因此,backlog 大小表示批量的大小,而不是消息的大小。// 等消息積攢到一定數(shù)量再一起發(fā)送
????????在 Pulsar 中,批量消息作為單個單元而不是單個消息進行跟蹤和存儲。消費者需要將批量消息拆分為單個消息進行處理。但是,即使啟用了批處理,預(yù)先計劃的 messages(通過 deliverAt 或 deliverAfter 參數(shù)配置)也始終只作為單獨的消息發(fā)送。
????????通常,當(dāng)批量中的所有消息都被消費者確認時,該批量也會被確認。但是,當(dāng)批量中有消息沒有被確認,或者出現(xiàn)意外的失敗,否定的確認以及確認超時會導(dǎo)致批量中的所有消息都重新的傳遞。
????????為了避免將批量中已確認的消息重新發(fā)送給消費者,Pulsar 從 Pulsar 2.6.0 開始引入批量索引確認機制。啟用批量索引確認機制后,消費者會過濾出已經(jīng)確認過的批量索引,并將批量索引確認請求發(fā)送給代理(broker)。代理(broker)會維護和跟蹤每個批量索引的確認狀態(tài),并避免向消費者(consumer)發(fā)送已確認過的消息。當(dāng)批量中的所有消息都被確認后,該批量將會被刪除。// 批量索引會帶來內(nèi)存開銷,但是可以避免數(shù)據(jù)重復(fù)消費
????????默認情況下,禁用批量索引確認機制(AcknowledgementAtBatchIndexLevelEnabled=false)。但是可以通過在代理端(broker)將 AcknowledgementAtBatchIndexLevelEnabled 參數(shù)設(shè)置為 true 來啟用批量索引確認機制。啟用批量索引確認機制會導(dǎo)致更多內(nèi)存開銷。
5、消息分塊發(fā)送
????????消息分塊,使 Pulsa 的生產(chǎn)者和消費者都能處理大型有效負載消息(在生產(chǎn)者端將消息分塊,在消費端將消息聚合)。
????????啟用消息分塊后,當(dāng)消息(messages)大小超過允許的最大負載大小( broker 的 maxMessageSize 參數(shù))時,消息的工作流如下所示:
- 生產(chǎn)者(producer)將原始消息拆分為分塊消息,并將它們與分塊元數(shù)據(jù)一起按順序單獨發(fā)布到代理。
- 代理(broker)以與普通消息相同的方式將分塊消息存儲在一個 managed-ledger(托管賬本)中,并使用?chunkedMessageRate?參數(shù)記錄該主題上分塊消息的速率。
- 消費者(consumer)會緩存分塊的消息,并在接收到消息的所有分塊時將其聚合到接收隊列中。
- 客戶端(client)消費接收隊列中的聚合消息。
消息分塊的限制:
- 只能用于持久化主題
- 只能用于獨占(exclusive)訪問和容錯訂閱(failover subscription)類型。
- 不能和批處理同時使用
處理連續(xù)的分塊消息
????????下圖展示了處理連續(xù)分塊消息的過程。圖中,生產(chǎn)者依次往一個主題中發(fā)布分塊消息(大型消息)和非分塊消息(常規(guī)消息 M3 \ M4)。生產(chǎn)者(producer)在發(fā)布 M1 消息時,把 M1 分成了 M1-C1、M1-C2 和 M1-C3 三個塊消息。broker 端會存儲所有的分塊消息(放在 managed-ledger 中),并把他們按照相同的順序發(fā)送給消費者。消費者(consumer)會在內(nèi)存中緩存接收到的分塊消息,直到全部接收,然后把它們聚合為原始的消息 M1,最后將原始的消息 M1 移交給客戶端(client)。
處理不連續(xù)的分塊消息
????????當(dāng)多個生產(chǎn)者將分塊消息發(fā)布到單個主題中時,代理(broker)將來自不同生產(chǎn)者的所有分塊消息都存儲在同一個托管賬本(managed-ledger)中。托管賬本中的分塊消息相互交錯,如下所示,生產(chǎn)者-1將消息 M1 分為三個數(shù)據(jù)塊 M1-C1、M1-C2 和 M1-C3 進行發(fā)布。生產(chǎn)者-2將消息? M2 也分為三個塊 M2-C1、M2-C2 和 M2-C3 進行發(fā)布。但是特定消息的分塊消息仍處于有序狀態(tài),雖然他們在托管分類中可能不是連續(xù)的。// 分塊通過消費者進行合并處理后再發(fā)送發(fā)給客戶端
注意事項
????????在這種情況下,交錯的分塊消息可能會給消費者帶來一些內(nèi)存壓力,因為消費者為每個大型消息都保留了單獨的緩沖區(qū),以便將其所有的分塊消息聚合成一條消息。通過配置maxPendingChunkedMessage參數(shù),可以限制消費者并發(fā)維護的最大分塊消息數(shù)量。當(dāng)維護量達到數(shù)量閾值時,消費者會暫時丟棄這些消息,隨后不發(fā)送消息確認,或者要求代理重傳來進行消息補償,從而優(yōu)化內(nèi)存利用率。
啟用消息分塊
????????前提條件:通過將?enableBatching?參數(shù)設(shè)置為 false 來禁用批量處理。
????????默認情況下,消息分塊功能處于關(guān)閉狀態(tài)。要啟用消息分塊,請在創(chuàng)建生產(chǎn)者時將?chunkingEnabled?參數(shù)設(shè)置為 true。
注意事項
????????如果消費者未能在指定的時間段內(nèi)收到消息的所有分塊,那么不完整的分塊消息將過期。過期的時間默認值為1分鐘。有關(guān)?ExpireTimeofCompletechUnkedMessage?參數(shù)的更多信息,請參閱 org。
點擊回到首頁