做照片書的模板下載網(wǎng)站好惠州網(wǎng)站推廣排名
RabbitMQ3.13.x之十_流過濾的內(nèi)部結(jié)構(gòu)設(shè)計與實(shí)現(xiàn)
文章目錄
- RabbitMQ3.13.x之十_流過濾的內(nèi)部結(jié)構(gòu)設(shè)計與實(shí)現(xiàn)
- 1. 概念
- 1. 消息發(fā)布
- 2. 消息消費(fèi)
- 2. 流的結(jié)構(gòu)
- 1. 在代理端進(jìn)行過濾
- 2. 客戶端篩選
- 3. JavaAPI示例
- 4. 流過濾配置
- 5. AMQP上的流過濾
- 6. 總結(jié)
- 3. 相關(guān)鏈接
1. 概念
流過濾的思想是在代理端提供第一級的高效過濾,而無需代理解釋消息。 這樣,只需要流子集的使用者就不需要自己獲取所有數(shù)據(jù)并處理所有過濾。 這可以大大減少傳輸給消費(fèi)者的數(shù)據(jù)。
通過篩選,可以將篩選器值與每條消息相關(guān)聯(lián)。 它可以是地理信息,例如每條消息來自的世界區(qū)域,如下圖所示:
因此,我們的流有 1 條(綠色)消息、1 條(深藍(lán)色)消息、2 條(紫色)消息,然后是 2 條消息。AMER``APAC``EMEA``AMER
1. 消息發(fā)布
發(fā)布者可以將每封出站郵件與其篩選器值相關(guān)聯(lián):
在上圖中,發(fā)布者發(fā)布了 1 條(綠色)消息和 2 條(紫色)消息,這些消息將添加到流中。AMER``EMEA
2. 消息消費(fèi)
當(dāng)使用者訂閱時,它可以指定一個或多個過濾器值,并且代理應(yīng)僅發(fā)送具有此或這些過濾器值的消息。 我們很快就會看到這在實(shí)踐中有點(diǎn)不同,但這足以理解這些概念。
在下圖中,頂部的使用者指定它只想要(綠色)消息,而代理只發(fā)送這些消息。 中間的消費(fèi)者和底部的消費(fèi)者也是如此。AMER``EMEA``APAC
概念就到這里了,現(xiàn)在讓我們來了解一下實(shí)現(xiàn)細(xì)節(jié)。
2. 流的結(jié)構(gòu)
我們需要知道流是如何構(gòu)建的,以便了解流過濾的內(nèi)部結(jié)構(gòu)。 流是包含段文件的目錄。 每個區(qū)段文件都有一個關(guān)聯(lián)的索引文件(用于了解在區(qū)段文件中給定偏移量處附加使用者的位置等)。 擁有多個“小”段文件比為整個流擁有一個大的整體文件要好:例如,刪除“舊”段文件以截斷流比刪除大文件的開頭更有效、更安全。
區(qū)段文件由包含消息的塊組成。 區(qū)塊中的消息數(shù)取決于入口速率(高入口速率表示一個塊中的消息較多,低入口速率表示塊中的消息較少)。 塊中的消息數(shù)量從幾條(甚至 1 條)到幾千條不等。
塊是怎么回事? 塊是流中的工作單元:它們用于復(fù)制,更重要的是,對于我們的主題,用于消費(fèi)者交付。 代理使用 sendfile
系統(tǒng)調(diào)用(將整個塊從文件系統(tǒng)發(fā)送到網(wǎng)絡(luò)套接字,而不將數(shù)據(jù)復(fù)制到用戶空間)向使用者發(fā)送塊,一次一個。
下圖說明了流的結(jié)構(gòu):
有了這個,讓我們看看代理如何知道是否要調(diào)度一個區(qū)塊。
1. 在代理端進(jìn)行過濾
想象一下,我們有一個只想要(綠色)消息的消費(fèi)者。 當(dāng)代理要調(diào)度一個區(qū)塊時,它需要知道該區(qū)塊是否包含消息。 如果是這樣,它可以將塊發(fā)送給消費(fèi)者,如果沒有,代理可以跳過該塊,轉(zhuǎn)到下一個塊,然后重新迭代。AMER``AMER
每個區(qū)塊都有一個標(biāo)頭,該標(biāo)頭可以包含一個 Bloom 過濾器,該標(biāo)頭告訴代理該塊是否包含具有給定過濾器值的消息。 Bloom 過濾器是一種節(jié)省空間的概率數(shù)據(jù)結(jié)構(gòu),用于測試元素是否是集合的成員。 在我們的示例中,集合包含 、 和 ,元素是 。AMER``EMEA``APAC``AMER
下圖說明了 3 個塊的代理端過濾過程:
如上圖所示,篩選器可能會返回誤報,即不包含具有預(yù)期篩選器值的消息的塊。 這是正常的,因?yàn)?Bloom 過濾器是概率性的。 不過,它們不會返回假陰性:如果過濾器顯示沒有(綠色)消息,我們可以確定它是真的。 我們必須忍受這種不確定性:有時我們可能會無緣無故地調(diào)度一些塊,但這總比調(diào)度所有塊要好。AMER
可以肯定的是,消費(fèi)者可以接收到它不想要的消息:看看我們左邊的第一個塊,它包含消費(fèi)者要求的(綠色)消息,但也包含(紫色)和(深藍(lán)色)消息。 這就是為什么客戶端也必須進(jìn)行過濾的原因。AMER``EMEA``APAC
2. 客戶端篩選
代理在傳遞消息時處理第一級過濾,但由于傳遞單位是塊,因此使用者仍然可以接收它不想要的消息。 因此,客戶端還必須執(zhí)行一些篩選,這顯然必須與訂閱時設(shè)置的篩選值一致。
下圖說明了一個消費(fèi)者,它只需要(綠色)消息,并且必須執(zhí)行最后一步的篩選:AMER
讓我們看看這如何轉(zhuǎn)化為應(yīng)用程序代碼。
3. JavaAPI示例
篩選不是侵入性的,可以作為跨領(lǐng)域問題進(jìn)行處理,從而最大限度地減少對應(yīng)用程序代碼的影響。 以下是在使用流 Java 客戶端(方法)聲明生產(chǎn)者時設(shè)置從消息中提取過濾器值的邏輯:filterValue(Function<Message,String>)
Producer producer = environment.producerBuilder().stream("invoices").filterValue(msg -> msg.getApplicationProperties().get("region").toString()) .build();
在消費(fèi)端,流 Java 客戶端提供了設(shè)置過濾器值的方法和設(shè)置客戶端過濾邏輯的方法。 聲明使用者時,必須調(diào)用這兩種方法:filter().values(String... filterValues)``filter().postFilter(Predicate<Message> filter)
Consumer consumer = environment.consumerBuilder().stream("invoices").filter().values("AMER") .postFilter(msg -> "AMER".equals(msg.getApplicationProperties().get("region"))) .builder().messageHandler((ctx, msg) -> {// message processing code}).build();
如您所見,篩選不會更改發(fā)布和使用代碼,而只是更改生產(chǎn)者和使用者的聲明。
現(xiàn)在讓我們看看如何以最合適的方式為用例配置流過濾。
4. 流過濾配置
關(guān)于流過濾的第一篇文章提供了一些數(shù)字(與不過濾相比,過濾節(jié)省了大約 80% 的帶寬)。 流過濾的好處很大程度上取決于用例:入口速率、基數(shù)和過濾器值的分布,以及過濾器大小。 過濾器越大越好(錯誤率越小)。 可以為塊中使用的篩選器大小設(shè)置一個介于 16 到 255 字節(jié)之間的值,默認(rèn)值為 16 字節(jié)。
流 Java 客戶端提供了在創(chuàng)建流時設(shè)置過濾器大小的方法(它在內(nèi)部設(shè)置參數(shù)):filterSize(int)``stream-filter-size-bytes
environment.streamCreator().stream("invoices").filterSize(32).create()
如何估算過濾器的尺寸? 網(wǎng)上有許多 Bloom 濾鏡計算器。 參數(shù)包括哈希函數(shù)的數(shù)量(RabbitMQ 流過濾為 2 個)、預(yù)期元素的數(shù)量、錯誤率和大小。 您通常對元素的數(shù)量有所了解,因此您需要在錯誤率和過濾器大小之間找到權(quán)衡。
以下是一些示例:
- 10 個值,16 個字節(jié) => 2 % 錯誤率
- 30 個值,16 個字節(jié) => 14 % 錯誤率
- 200 個值,128 個字節(jié) => 10 % 的錯誤率
那么,過濾器越大越好? 不完全是:盡管 Bloom 過濾器在存儲方面非常有效,因?yàn)樗淮鎯υ?#xff0c;只是元素是否在集合中,過濾器大小是預(yù)先分配的。 如果將篩選器大小設(shè)置為 255,并且每個塊至少包含一條具有篩選器值的消息,則每個塊標(biāo)頭中將分配 255 個字節(jié)。 如果塊包含許多大消息,這很好,因?yàn)榕c塊大小相比,篩選器大小可以忽略不計。 但是,對于退化的情況,例如具有 10 字節(jié)消息和 10 字節(jié)篩選器值的單消息塊,您最終會得到一個比實(shí)際數(shù)據(jù)更大的篩選器。
您必須嘗試自己的用例,以估計過濾器大小對流大小的影響。 關(guān)于流過濾的第一篇文章提供了一個使用 Stream PerfTest 估計流大小的技巧(在不過濾的情況下讀取整個流并查閱指標(biāo))。rabbitmq_stream_read_bytes_total
5. AMQP上的流過濾
盡管訪問流的首選方式是流協(xié)議,但支持其他協(xié)議,例如 AMQP。 任何 AMQP 客戶端庫也支持流篩選:
- 聲明:將參數(shù)設(shè)置為 并使用 在聲明流時設(shè)置篩選器大小。
x-queue-type``stream``x-stream-filter-size-bytes
- 發(fā)布:使用標(biāo)頭設(shè)置出站郵件的篩選器值。
x-stream-filter-value
- 使用:使用 consumer 參數(shù)設(shè)置預(yù)期的篩選器值(字符串或字符串?dāng)?shù)組),并使用 consumer 參數(shù)(可選)接收沒有任何篩選值的消息(默認(rèn)值為 )??蛻舳诉^濾仍然是必要的!
x-stream-filter``x-stream-match-unfiltered``false
6. 總結(jié)
流過濾易于使用并從中受益,但有關(guān)內(nèi)部的一些知識可用于優(yōu)化其使用,尤其是對于棘手的用例。 請記住,客戶端篩選是必需的,并且必須與配置的篩選器值一致。 這通常很容易實(shí)現(xiàn)。 還可以為給定的用例以最合適的方式設(shè)置過濾器大小。
3. 相關(guān)鏈接
參考:
Stream Filtering Internals | RabbitMQ