做測(cè)試題的網(wǎng)站關(guān)鍵詞優(yōu)化的最佳方法
1、 kafka 是什么,有什么作用
2、Kafka為什么這么快
3、Kafka架構(gòu)及名詞解釋
4、Kafka中的AR、ISR、OSR代表什么
5、HW、LEO代表什么
6、ISR收縮性
7、kafka follower如何與leader同步數(shù)據(jù)
8、Zookeeper 在 Kafka 中的作用(早期)
9、Kafka如何快速讀取指定offset的消息
10、生產(chǎn)者發(fā)送消息有哪些模式
11、發(fā)送消息的分區(qū)策略有哪些
12、Kafka可靠性保證(不丟消息)
13、Kafka 是怎么去實(shí)現(xiàn)負(fù)載均衡的
14、簡(jiǎn)述Kafka的Rebalance機(jī)制
15、Kafka 負(fù)載均衡會(huì)導(dǎo)致什么問題
16、如何增強(qiáng)消費(fèi)者的消費(fèi)能力
17、消費(fèi)者與Topic的分區(qū)策略
18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)
19、為什么Kafka不支持讀寫分離
20、Kafka選舉機(jī)制
21、腦裂問題
22、如何為Kafka集群選擇合適的Topics/Partitions數(shù)量
23、Kafka 分區(qū)數(shù)可以增加或減少嗎?為什么
24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解
25、談?wù)勀銓?duì) Kafka事務(wù)的了解?
26、Kafka消息是采用Pull模式,還是Push模式?
27、Kafka缺點(diǎn)
28、Kafka什么時(shí)候會(huì)丟數(shù)據(jù)
29、Kafka分區(qū)數(shù)越多越好嗎?
30、Kafka如何保證消息的有序性
31、Kafka精確一次性(Exactly-once)如何保證
1、 kafka 是什么,有什么作用
Kafka是一個(gè)開源的高吞吐量的分布式消息中間件,對(duì)比于其他 1) 緩沖和削峰:上游數(shù)據(jù)時(shí)有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個(gè)緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。
1) 解耦和擴(kuò)展性:項(xiàng)目開始的時(shí)候,并不能確定具體需求。消息隊(duì)列可以作為一個(gè)接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對(duì)數(shù)據(jù)編程即可獲取擴(kuò)展能力。
1) 冗余:可以采用一對(duì)多的方式,一個(gè)生產(chǎn)者發(fā)布消息,可以被多個(gè)訂閱topic的服務(wù)消費(fèi)到,供多個(gè)毫無關(guān)聯(lián)的業(yè)務(wù)使用。
1) 健壯性:消息隊(duì)列可以堆積請(qǐng)求,所以消費(fèi)端業(yè)務(wù)即使短時(shí)間死掉,也不會(huì)影響主要業(yè)務(wù)的正常進(jìn)行。
1) 異步通信:很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。
2、Kafka為什么這么快
- 利用 Partition 實(shí)現(xiàn)并行處理 不同 Partition 可位于不同機(jī)器,因此可以充分利用集群優(yōu)勢(shì),實(shí)現(xiàn)機(jī)器間的并行處理。另一方面,由于 Partition 在物理上對(duì)應(yīng)一個(gè)文件夾,即使多個(gè) Partition 位于同一個(gè)節(jié)點(diǎn),也可通過配置讓同一節(jié)點(diǎn)上的不同 Partition 置于不同的磁盤上,從而實(shí)現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢(shì)。
- 利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ) Page Cache 來利用內(nèi)存提高 I/O 效率
- 順序?qū)?kafka的消息是不斷追加到文件中的,這個(gè)特性使kafka可以充分利用磁盤的順序讀寫性能 由于現(xiàn)代的操作系統(tǒng)提供了預(yù)讀和寫技術(shù),磁盤的順序?qū)懘蠖鄶?shù)情況下比隨機(jī)寫內(nèi)存還要快。順序讀寫不需要硬盤磁頭的尋道時(shí)間,只需很少的扇區(qū)旋轉(zhuǎn)時(shí)間,所以速度遠(yuǎn)快于隨機(jī)讀寫
- Zero-copy 零拷技術(shù)減少拷貝次數(shù)
- 數(shù)據(jù)批量處理。合并小的請(qǐng)求,然后以流的方式進(jìn)行交互,直頂網(wǎng)絡(luò)上限。在很多情況下,系統(tǒng)的瓶頸不是 CPU 或磁盤,而是網(wǎng)絡(luò)IO。因此,除了操作系統(tǒng)提供的低級(jí)批處理之外,Kafka 的客戶端和 broker 還會(huì)在通過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)之前,在一個(gè)批處理中累積多條記錄 (包括讀和寫)。記錄的批處理分?jǐn)偭司W(wǎng)絡(luò)往返的開銷,使用了更大的數(shù)據(jù)包從而提高了帶寬利用率。
- Pull 拉模式 使用拉模式進(jìn)行消息的獲取消費(fèi),與消費(fèi)端處理能力相符。
- 數(shù)據(jù)壓縮 Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer可以通過GZIP、Snappy、LZ4格式對(duì)消息集合進(jìn)行壓縮,數(shù)據(jù)壓縮一般都是和批處理配套使用來作為優(yōu)化手段的。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫?Producer壓縮之后,在Consumer需進(jìn)行解壓,雖然增加了CPU的工作,但在對(duì)大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是CPU,所以這個(gè)成本很值得
3、Kafka架構(gòu)及名詞解釋
簡(jiǎn)易架構(gòu)圖如下:
詳細(xì)架構(gòu)圖如下
- Broker :一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。
- Producer:消息生產(chǎn)者,向kafka broker發(fā)送消息的客戶端。
- Consumer:消息消費(fèi)者,向kafka broker取消息的客戶端。
- Topic:隊(duì)列,生產(chǎn)者和消費(fèi)者通過此進(jìn)行對(duì)接。
- Consumer Group (CG):若干個(gè)Consumer組成的集合。這是kafka用來實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG。topic的消息會(huì)復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)CG只會(huì)把消息發(fā)給該CG中的一個(gè)consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)CG。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic。
- Partition:分區(qū),為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)topic可以分布在多個(gè)broker上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition都是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。kafka只保證同一個(gè)partition中的消息順序,不保證一個(gè)topic的整體(多個(gè)partition之間)的順序。生產(chǎn)者和消費(fèi)者使用時(shí)可以指定topic中的具體partition。
- 副本:在kafka中,每個(gè)主題可以有多個(gè)分區(qū),每個(gè)分區(qū)又可以有多個(gè)副本。這多個(gè)副本中,只有一個(gè)是leader,而其他的都是follower副本。僅有l(wèi)eader副本可以對(duì)外提供服務(wù)。多個(gè)follower副本通常存放在和leader副本不同的broker中。通過這樣的機(jī)制實(shí)現(xiàn)了高可用,當(dāng)某臺(tái)機(jī)器掛掉后,其他follower副本也能迅速”轉(zhuǎn)正“,開始對(duì)外提供服務(wù)。
- offset:消費(fèi)偏移量,topic中的每個(gè)分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的log文件。分區(qū)中的每一個(gè)記錄都會(huì)分配一個(gè)id號(hào)來表示順序,我們稱之為offset,offset用來唯一的標(biāo)識(shí)分區(qū)中每一條記錄。可以設(shè)置為“自動(dòng)提交”與“手動(dòng)提交”。
4、Kafka中的AR、ISR、OSR代表什么
- AR:Assigned Replicas 指當(dāng)前分區(qū)中的所有副本。
- ISR:In-Sync Replicas 副本同步隊(duì)列。ISR中包括Leader和Foller。如果Leader進(jìn)程掛掉,會(huì)在ISR隊(duì)列中選擇一個(gè)服務(wù)作為新的Leader。有replica.lag.max.message(延遲條數(shù))和replica.lag.time.max.ms(延遲時(shí)間)兩個(gè)參數(shù)決定一臺(tái)服務(wù)器是否可以加入ISR副本隊(duì)列,在0.10版本之后移除了replica.lag.max.message(延遲條數(shù))參數(shù),防治服務(wù)頻繁的進(jìn)出隊(duì)列。任意一個(gè)維度超過閾值都會(huì)把Follower踢出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會(huì)先存放在OSR中。
- OSR:(Out-of-Sync Replicas)非同步副本隊(duì)列。與leader副本同步滯后過多的副本(不包括leader副本)組成OSR。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本會(huì)把它從OSR集合轉(zhuǎn)移至ISR集合。默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時(shí),只有在ISR集合中的副本才有資格被選舉為新的leader,而在OSR集合中的副本則沒有任何機(jī)會(huì)(不過這個(gè)原則也可以通過修改unclean.leader.election.enable參數(shù)配置來改變)。unclean.leader.election.enable 為true的話,意味著非ISR集合的broker 也可以參與選舉,這樣就有可能發(fā)生數(shù)據(jù)丟失和數(shù)據(jù)不一致的情況,Kafka的可靠性就會(huì)降低;而如果unclean.leader.election.enable參數(shù)設(shè)置為false,Kafka的可用性就會(huì)降低。
ISR的伸縮:1)Leader跟蹤維護(hù)ISR中follower滯后狀態(tài),落后太多或失效時(shí),leade把他們從ISR剔除。2)OSR中follower“追上”Leader,在ISR中才有資格選舉leader。
5、HW、LEO代表什么
- LEO (Log End Offset),標(biāo)識(shí)當(dāng)前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1.分區(qū) ISR 集合中的每個(gè)副本都會(huì)維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對(duì)消費(fèi)者而言只能消費(fèi) HW 之前的消息。
- HW:replica高水印值,副本中最新一條已提交消息的位移。leader 的HW值也就是實(shí)際已提交消息的范圍,每個(gè)replica都有HW值,但僅僅leader中的HW才能作為標(biāo)示信息。什么意思呢,就是說當(dāng)按照參數(shù)標(biāo)準(zhǔn)成功完成消息備份(成功同步給follower replica后)才會(huì)更新HW的值,代表消息理論上已經(jīng)不會(huì)丟失,可以認(rèn)為“已提交”。
6、ISR收縮性
啟動(dòng) Kafka時(shí)候自動(dòng)開啟的兩個(gè)定時(shí)任務(wù),“isr-expiration"和”isr-change-propagation"。
- isr-expiration:isr-expiration任務(wù)會(huì)周期性的檢測(cè)每個(gè)分區(qū)是否需要縮減其ISR集合,相當(dāng)于一個(gè)紀(jì)檢委員,巡查尖子班時(shí)候發(fā)現(xiàn)有學(xué)生睡覺打牌看小說,就把它的座位移除尖子班,縮減ISR,寧缺毋濫。同樣道理,如果follower數(shù)據(jù)同步趕上leader,那么該follower就能進(jìn)入ISR尖子班,擴(kuò)充。上面關(guān)于ISR尖子班人員的所見,都會(huì)記錄到isrChangeSet中,想象成是一個(gè)名單列表,誰能進(jìn),誰要出,都記錄在案。
- isr-change-propagation:作用就是檢查isrChangeSet,按照名單上的信息移除和遷入,一般是2500ms檢查一次,但是為了防止頻繁收縮擴(kuò)充影響性能,不是每次都能做變動(dòng),必須滿足:1、上一次ISR集合發(fā)生變化距離現(xiàn)在已經(jīng)超過5秒,2、上一次寫入zookeeper的時(shí)候距離現(xiàn)在已經(jīng)超過60秒。這兩個(gè)條件都滿足,那么就開始換座位!這兩個(gè)條件可以由我們來配置。
- Kafka使用這種ISR收縮的方式有效的權(quán)衡了數(shù)據(jù)可靠性與性能之間的關(guān)系。
7、kafka follower如何與leader同步數(shù)據(jù)
Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。完全同步復(fù)制要求All Alive Follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,Follower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下,如果leader掛掉,會(huì)丟失數(shù)據(jù),kafka使用ISR的方式很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),而且Leader充分利用磁盤順序讀以及send file(zero copy)機(jī)制,這樣極大的提高復(fù)制性能,內(nèi)部批量寫磁盤,大幅減少了Follower與Leader的消息量差。
8、Zookeeper 在 Kafka 中的作用(早期)
zookeeper 是一個(gè)分布式的協(xié)調(diào)組件,早期版本的kafka用zk做meta信息存儲(chǔ),consumer的消費(fèi)狀態(tài),group的管理以及 offset的值。考慮到zk本身的一些因素以及整個(gè)架構(gòu)較大概率存在單點(diǎn)問題,新版本中逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對(duì)zookeeper的依賴,
但是broker依然依賴于ZK,zookeeper 在kafka中還用來選舉controller 和 檢測(cè)broker是否存活等等。
1. Broker注冊(cè):Broker是分布式部署并且互相獨(dú)立,此時(shí)需要有一個(gè)注冊(cè)系統(tǒng)能夠?qū)⒄麄€(gè)集群中的Broker管理起來,此時(shí)就用到的Zookeeper。在Zookeeper上會(huì)有一個(gè)專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):/brokes/ids
2.Topic注冊(cè):在kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息以及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper維護(hù),由專門的節(jié)點(diǎn)記錄:/brokers/topics
3.消費(fèi)者注冊(cè):消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下:注冊(cè)到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumer/[groupid]/ids/[consumerid],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫入該臨時(shí)節(jié)點(diǎn)。
- 對(duì)消費(fèi)者分組中的消費(fèi)者的變化注冊(cè)監(jiān)聽:每個(gè)消費(fèi)者都需要關(guān)注所屬消費(fèi)者分組中的其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumer/[group_id]/ids節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。
- 對(duì)Broker服務(wù)器變化注冊(cè)監(jiān)聽:消費(fèi)者需要對(duì)/broker/ids[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。
- 進(jìn)行消費(fèi)者負(fù)載均衡:為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè)消費(fèi)者消費(fèi)而進(jìn)行消費(fèi)者與消息分區(qū)分配的過程,通常對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)進(jìn)行消費(fèi)者負(fù)載均衡。
- Offset記錄 在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消費(fèi)的過程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便對(duì)該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:/consumers/[groupid]/offsets/[topic]/[brokerid-partition_id] 節(jié)點(diǎn)內(nèi)容就是Offset的值。
4.生產(chǎn)者負(fù)載均衡:由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。
- 四層負(fù)載均衡:根據(jù)生產(chǎn)者的IP地址和端口來為其圈定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)送到該Broker。這種方式邏輯簡(jiǎn)單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP鏈接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無法實(shí)時(shí)感知到Broker的新增和刪除。
- 使用Zookeeper進(jìn)行負(fù)載均衡,由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊(cè)過程,生產(chǎn)者會(huì)通過該節(jié)點(diǎn)的變化來動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。
5.消費(fèi)者負(fù)載均衡:與生產(chǎn)者相似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。
6.分區(qū)與消費(fèi)者的關(guān)系:消費(fèi)組consumer group下有多個(gè)Consumer(消費(fèi)者)。對(duì)于每個(gè)消費(fèi)者組(consumer group),Kafka都會(huì)為其分配一個(gè)全局唯一的Group ID,Group內(nèi)部的所有消費(fèi)者共享該ID。訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè)group下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group) 同時(shí),kafka為每個(gè)消費(fèi)者分配一個(gè)Consumer ID,通常采用“Hostname:UUID”形式表示。在kafka中,規(guī)定了每個(gè)消息分區(qū)只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在zookeeper上記錄消息分區(qū)與Consumer之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消費(fèi)分區(qū)的消費(fèi)權(quán)利,需要將其Consumer ID寫入到平Zookeeper對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:/consumers/[groupid]/owners/topic/[brokerid-partitionid] 其中,[brokerid-partition_id]就是一個(gè)消息分區(qū)的表示,節(jié)點(diǎn)內(nèi)容就是該消息分區(qū)上消費(fèi)者的Consumer ID。
7.補(bǔ)充:早期版本的 kafka 用 zk 做 meta 信息存儲(chǔ),consumer 的消費(fèi)狀態(tài),group 的管理以及 offse t的值。考慮到zk本身的一些因素以及整個(gè)架構(gòu)較大概率存在單點(diǎn)問題,新版本中確實(shí)逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對(duì)zookeeper的依賴
9、Kafka如何快速讀取指定offset的消息
Kafka本地日志存儲(chǔ)根據(jù)segement分段存儲(chǔ),默認(rèn)1G,其中segement包括index稀疏索引文件和log數(shù)據(jù)文件。其中index文件索引通過offset與posttion來定位數(shù)據(jù)文件中指定message的消息。其中index和log的文件名都為當(dāng)前segement的起始o(jì)ffset。
讀取offset=170418的消息,首先通過offset根據(jù)二分法定位到index索引文件,然后根據(jù)索引文件中的[offset,position](position為物理偏移地址)去log中獲取指定offset的message數(shù)據(jù)。
10、生產(chǎn)者發(fā)送消息有哪些模式
異步發(fā)送
對(duì)于生產(chǎn)者的異步發(fā)送來說就是,我發(fā)送完當(dāng)前消息后,并不需要你將當(dāng)前消息的發(fā)送結(jié)果立馬告訴我,而是可以隨即進(jìn)行下一條消息的發(fā)送。但是我會(huì)允許添加一個(gè)回調(diào)函數(shù),接收你后續(xù)返回的發(fā)送結(jié)果。異步發(fā)送這塊我們直接調(diào)用kafkaProducer的send方法即可實(shí)現(xiàn)異步發(fā)送。
同步發(fā)送
如果生產(chǎn)者需要使用同步發(fā)送的方式,只需要拿到 send 方法返回的future對(duì)象后,調(diào)用其 get() 方法即可。此時(shí)如果消息還未發(fā)送到broker中,get方法會(huì)被阻塞,等到 broker 返回消息發(fā)送結(jié)果后會(huì)跳出當(dāng)前方法并將結(jié)果返回。
11、發(fā)送消息的分區(qū)策略有哪些
所謂分區(qū)寫入策略,即是生產(chǎn)者將數(shù)據(jù)寫入到kafka主題后,kafka如何將數(shù)據(jù)分配到不同分區(qū)中的策略。
常見的有三種策略,輪詢策略,隨機(jī)策略,和按鍵保存策略。其中輪詢策略是默認(rèn)的分區(qū)策略,而隨機(jī)策略則是較老版本的分區(qū)策略,不過由于其分配的均衡性不如輪詢策略,故而后來改成了輪詢策略為默認(rèn)策略。
輪詢策略
所謂輪詢策略,即按順序輪流將每條數(shù)據(jù)分配到每個(gè)分區(qū)中。
舉個(gè)例子,假設(shè)主題test有三個(gè)分區(qū),分別是分區(qū)A,分區(qū)B和分區(qū)C。那么主題對(duì)接收到的第一條消息寫入A分區(qū),第二條消息寫入B分區(qū),第三條消息寫入C分區(qū),第四條消息則又寫入A分區(qū),依此類推。
輪詢策略是默認(rèn)的策略,故而也是使用最頻繁的策略,它能最大限度保證所有消息都平均分配到每一個(gè)分區(qū)。除非有特殊的業(yè)務(wù)需求,否則使用這種方式即可。
隨機(jī)策略
隨機(jī)策略,也就是每次都隨機(jī)地將消息分配到每個(gè)分區(qū)。其實(shí)大概就是先得出分區(qū)的數(shù)量,然后每次獲取一個(gè)隨機(jī)數(shù),用該隨機(jī)數(shù)確定消息發(fā)送到哪個(gè)分區(qū)。
在比較早的版本,默認(rèn)的分區(qū)策略就是隨機(jī)策略,但其實(shí)使用隨機(jī)策略也是為了更好得將消息均衡寫入每個(gè)分區(qū)。但后來發(fā)現(xiàn)對(duì)這一需求而言,輪詢策略的表現(xiàn)更優(yōu),所以社區(qū)后來的默認(rèn)策略就是輪詢策略了。
hash(Key)
按鍵保存策略,就是當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)的時(shí)候,可以指定一個(gè)key,計(jì)算這個(gè)key的hashCode值,按照hashCode的值對(duì)不同消息進(jìn)行存儲(chǔ)。
至于要如何實(shí)現(xiàn),那也簡(jiǎn)單,只要讓生產(chǎn)者發(fā)送的時(shí)候指定key就行。欸剛剛不是說默認(rèn)的是輪詢策略嗎?其實(shí)啊,kafka默認(rèn)是實(shí)現(xiàn)了兩個(gè)策略,沒指定key的時(shí)候就是輪詢策略,有的話那激素按鍵保存策略了。
上面有說到一個(gè)場(chǎng)景,那就是要順序發(fā)送消息到kafka。前面提到的方案是讓所有數(shù)據(jù)存儲(chǔ)到一個(gè)分區(qū)中,但其實(shí)更好的做法,就是使用這種按鍵保存策略。
讓需要順序存儲(chǔ)的數(shù)據(jù)都指定相同的鍵,而不需要順序存儲(chǔ)的數(shù)據(jù)指定不同的鍵,這樣一來,即實(shí)現(xiàn)了順序存儲(chǔ)的需求,又能夠享受到kafka多分區(qū)的優(yōu)勢(shì),豈不美哉。
粘性分區(qū)
所以如果使用默認(rèn)的輪詢partition策略,可能會(huì)造成一個(gè)大的batch被輪詢成多個(gè)小的batch的情況。鑒于此,kafka2.4的時(shí)候推出一種新的分區(qū)策略,即StickyPartitioning Strategy,StickyPartitioning Strategy會(huì)隨機(jī)地選擇另一個(gè)分區(qū)并會(huì)盡可能地堅(jiān)持使用該分區(qū)——即所謂的粘住這個(gè)分區(qū)。
鑒于小batch可能導(dǎo)致延時(shí)增加,之前對(duì)于無Key消息的分區(qū)策略效率很低。社區(qū)于2.4版本引入了黏性分區(qū)策略(StickyPartitioning Strategy)。該策略是一種全新的策略,能夠顯著地降低給消息指定分區(qū)過程中的延時(shí)。使用StickyPartitioner有助于改進(jìn)消息批處理,減少延遲,并減少broker的負(fù)載。
自定義分區(qū)器
實(shí)現(xiàn)partitioner接口
切記分區(qū)是實(shí)現(xiàn)負(fù)載均衡以及高吞吐量的關(guān)鍵,所以一定要在生產(chǎn)者這一端就要考慮好合適的分區(qū)策略,避免造成消息數(shù)據(jù)的“傾斜”,使得某些分區(qū)成為性能瓶頸,從而導(dǎo)致下游數(shù)據(jù)消費(fèi)的性能下降的問題。
12、Kafka可靠性保證(不丟消息)
Kafka精確一次性(Exactly-once)保障之一
Kafka可靠性主要從三個(gè)方面來看,Broker、Producer、Consumer。1. Brokerbroker寫數(shù)據(jù)時(shí)首先寫到PageCache中,pageCache的數(shù)據(jù)通過linux的flusher程序異步批量存儲(chǔ)至磁盤中,此過程稱為刷盤。而pageCache位于內(nèi)存。這部分?jǐn)?shù)據(jù)會(huì)在斷電后丟失。刷盤觸發(fā)條件有三:
- 主動(dòng)調(diào)用sync或fsync函數(shù)
- 可用內(nèi)存低于閥值
- dirty data時(shí)間達(dá)到閥值。dirty是pagecache的一個(gè)標(biāo)識(shí)位,當(dāng)有數(shù)據(jù)寫入到pageCache時(shí),pagecache被標(biāo)注為dirty,數(shù)據(jù)刷盤以后,dirty標(biāo)志清除。
kafka沒有提供同步刷盤的方式,也就是說理論上要完全讓kafka保證單個(gè)broker不丟失消息是做不到的,只能通過調(diào)整刷盤機(jī)制的參數(shù)緩解該情況,比如:
減少刷盤間隔log.flush.interval.ms(在刷新到磁盤之前,任何topic中的消息保留在內(nèi)存中的最長(zhǎng)時(shí)間) 減少刷盤數(shù)據(jù)量大小log.flush.interval.messages(在將消息刷新到磁盤之前,在日志分區(qū)上累積的消息數(shù)量)。
時(shí)間越短,數(shù)據(jù)量越小,性能越差,但是丟失的數(shù)據(jù)會(huì)變少,可靠性越好。這是一個(gè)選擇題。
同時(shí),Kafka通過producer和broker協(xié)同處理消息丟失的情況,一旦producer發(fā)現(xiàn)broker消息丟失,即可自動(dòng)進(jìn)行retry。retry次數(shù)可根據(jù)參數(shù)retries進(jìn)行配置,超過指定次數(shù)會(huì),此條消息才會(huì)被判斷丟失。producer和broker之間,通過ack機(jī)制來判斷消息是否丟失。
- acks=0,producer不等待broker的響應(yīng),效率最高,但是消息很可能會(huì)丟。
- acks=1,leader broker收到消息后,不等待其他follower的響應(yīng),即返回ack。也可以理解為ack數(shù)為1。此時(shí),如果follower還沒有收到leader同步的消息leader就掛了,那么消息會(huì)丟失。按照上圖中的例子,如果leader收到消息,成功寫入PageCache后,會(huì)返回ack,此時(shí)producer認(rèn)為消息發(fā)送成功。但此時(shí),按照上圖,數(shù)據(jù)還沒有被同步到follower。如果此時(shí)leader斷電,數(shù)據(jù)會(huì)丟失。
- acks=-1,leader broker收到消息后,掛起,等待所有ISR列表中的follower返回結(jié)果后,再返回ack。-1等效與all。這種配置下,只有l(wèi)eader寫入數(shù)據(jù)到pagecache是不會(huì)返回ack的,還需要所有的ISR返回“成功”才會(huì)觸發(fā)ack。如果此時(shí)斷電,producer可以知道消息沒有被發(fā)送成功,將會(huì)重新發(fā)送。如果在follower收到數(shù)據(jù)以后,成功返回ack,leader斷電,數(shù)據(jù)將存在于原來的follower中。在重新選舉以后,新的leader會(huì)持有該部分?jǐn)?shù)據(jù)。數(shù)據(jù)從leader同步到follower,需要2步:
- 數(shù)據(jù)從pageCache被刷盤到disk。因?yàn)橹挥衐isk中的數(shù)據(jù)才能被同步到replica。
- 數(shù)據(jù)同步到replica,并且replica成功將數(shù)據(jù)寫入PageCache。在producer得到ack后,哪怕是所有機(jī)器都停電,數(shù)據(jù)也至少會(huì)存在于leader的磁盤內(nèi)。
- 上面第三點(diǎn)提到了ISR的列表的follower,需要配合另一個(gè)參數(shù)才能更好的保證ack的有效性。ISR是Broker維護(hù)的一個(gè)“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個(gè)參數(shù):min.insync.replicas。該參數(shù)表示ISR中最少的副本數(shù)。如果不設(shè)置該值,ISR中的follower列表可能為空。此時(shí)相當(dāng)于acks=1。
Topic 分區(qū)副本
在 Kafka 0.8.0 之前,Kafka 是沒有副本的概念的,那時(shí)候人們只會(huì)用 Kafka 存儲(chǔ)一些不重要的數(shù)據(jù),因?yàn)闆]有副本,數(shù)據(jù)很可能會(huì)丟失。但是隨著業(yè)務(wù)的發(fā)展,支持副本的功能越來越強(qiáng)烈,所以為了保證數(shù)據(jù)的可靠性,Kafka 從 0.8.0 版本開始引入了分區(qū)副本(詳情請(qǐng)參見 KAFKA-50)。也就是說每個(gè)分區(qū)可以人為的配置幾個(gè)副本(比如創(chuàng)建主題的時(shí)候指定 replication-factor,也可以在 Broker 級(jí)別進(jìn)行配置 default.replication.factor),一般會(huì)設(shè)置為3。
Kafka 可以保證單個(gè)分區(qū)里的事件是有序的,分區(qū)可以在線(可用),也可以離線(不可用)。在眾多的分區(qū)副本里面有一個(gè)副本是 Leader,其余的副本是 follower,所有的讀寫操作都是經(jīng)過 Leader 進(jìn)行的,同時(shí) follower 會(huì)定期地去 leader 上的復(fù)制數(shù)據(jù)。當(dāng) Leader 掛了的時(shí)候,其中一個(gè) follower 會(huì)重新成為新的 Leader。通過分區(qū)副本,引入了數(shù)據(jù)冗余,同時(shí)也提供了 Kafka 的數(shù)據(jù)可靠性。
Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個(gè)副本可以使 Kafka 在發(fā)生崩潰時(shí)仍能保證消息的持久性。
2. Producer
producer在發(fā)送數(shù)據(jù)時(shí)可以將多個(gè)請(qǐng)求進(jìn)行合并后異步發(fā)送,合并后的請(qǐng)求首先緩存在本地buffer中,正常情況下,producer客戶端的異步調(diào)用可以通過callback回調(diào)函數(shù)來處理消息發(fā)送失敗或者超時(shí)的情況,但是當(dāng)出現(xiàn)以下情況,將會(huì)出現(xiàn)數(shù)據(jù)丟失
- producer異常中斷,buffer中的數(shù)據(jù)將丟失。
- producer客戶端內(nèi)存不足,如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會(huì)丟失。
- 消息產(chǎn)生(異步)過快,導(dǎo)致掛起線程過多,內(nèi)存不足,導(dǎo)致程序崩潰,消息丟失。
針對(duì)以上情況,可以有以下解決思路。
- producer采用同步方式發(fā)送消息,或者生產(chǎn)數(shù)據(jù)時(shí)采用阻塞的線程池,并且線程數(shù)不宜過多。整體思路就是控制消息產(chǎn)生速度。
- 擴(kuò)大buffer的容量配置,配置項(xiàng)為:buffer.memory。這種方法可以緩解數(shù)據(jù)丟失的情況,但不能杜絕。
3.Consumer
Consumer消費(fèi)消息有以下幾個(gè)步驟:
- 接收消息
- 處理消息
- 反饋處理結(jié)果
消費(fèi)方式主要分為兩種
- 自動(dòng)提交offset,Automatic Offset Committing (enable.auto.commit=true)
- 手動(dòng)提交offset,Manual Offset Control(enable.auto.commit=false)
Consumer自動(dòng)提交機(jī)制是根據(jù)一定的時(shí)間間隔,將收到的消息進(jìn)行commit,具體配置為:auto.commit.interval.ms。commit和消費(fèi)的過程是異步的,也就是說可能存在消費(fèi)過程未成功,commit消息就已經(jīng)提交,此時(shí)就會(huì)出現(xiàn)消息丟失。我們可將提交類型改為手動(dòng)提交,在消費(fèi)完成后再進(jìn)行提交,這樣可以保證消息“至少被消費(fèi)一次”(at least once),但如果消費(fèi)完成后在提交過程中出現(xiàn)故障,則會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,本章不討論,下章講解。
13、Kafka 是怎么去實(shí)現(xiàn)負(fù)載均衡的
生產(chǎn)者層面
分區(qū)器是生產(chǎn)者層面的負(fù)載均衡。Kafka 生產(chǎn)者生產(chǎn)消息時(shí),根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負(fù)載均衡很大程度上依賴于分區(qū)器。Kafka 默認(rèn)的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進(jìn)行分區(qū)分配的:
如果 key 不為 null:對(duì) Key 值進(jìn)行 Hash 計(jì)算,從所有分區(qū)中根據(jù) Key 的 Hash 值計(jì)算出一個(gè)分區(qū)號(hào);擁有相同 Key 值的消息被寫入同一個(gè)分區(qū);如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認(rèn)的分區(qū)器,用戶可以實(shí)現(xiàn) Partitioner 接口,自行實(shí)現(xiàn)分區(qū)方法。
注:在筆者的理解中,分區(qū)器的負(fù)載均衡與順序性有著一定程度上的矛盾。
- 負(fù)載均衡的目的是將消息盡可能平均分配,對(duì)于 Kafka 而言,就是盡可能將消息平均分配給所有分區(qū);
- 如果使用 Kafka 保證順序性,則需要利用到 Kafka 的分區(qū)順序性的特性。
- 對(duì)于需要保證順序性的場(chǎng)景,通常會(huì)利用 Key 值實(shí)現(xiàn)分區(qū)順序性,那么所有 Key值相同的消息就會(huì)進(jìn)入同一個(gè)分區(qū)。這樣的情況下,對(duì)于大量擁有相同 Key值的消息,會(huì)涌入同一個(gè)分區(qū),導(dǎo)致一個(gè)分區(qū)消息過多,其他分區(qū)沒有消息的情況,即與負(fù)載均衡的思想相悖。
消費(fèi)者層面
主要根據(jù)消費(fèi)者的Rebalance機(jī)制實(shí)現(xiàn),內(nèi)容詳見下章
14、簡(jiǎn)述Kafka的Rebalance機(jī)制
什么是 Rebalance
Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè) Consumer Group 下的所有 consumer 如何達(dá)成一致,來分配訂閱 Topic 的每個(gè)分區(qū)。 例如:某 Group 下有 20 個(gè) consumer 實(shí)例,它訂閱了一個(gè)具有 100 個(gè) partition 的 Topic。正常情況下,kafka 會(huì)為每個(gè) Consumer 平均的分配 5 個(gè)分區(qū)。這個(gè)分配的過程就是 Rebalance。
觸發(fā) Rebalance 的時(shí)機(jī)
Rebalance 的觸發(fā)條件有3個(gè)。
- 組成員個(gè)數(shù)發(fā)生變化。例如有新的 consumer 實(shí)例加入該消費(fèi)組或者離開組。
- 訂閱的 Topic 個(gè)數(shù)發(fā)生變化。
- 訂閱 Topic 的分區(qū)數(shù)發(fā)生變化。
Rebalance 發(fā)生時(shí),Group 下所有 consumer 實(shí)例都會(huì)協(xié)調(diào)在一起共同參與,kafka 能夠保證盡量達(dá)到最公平的分配。但是 Rebalance 過程對(duì) consumer group 會(huì)造成比較嚴(yán)重的影響。在 Rebalance 的過程中 consumer group 下的所有消費(fèi)者實(shí)例都會(huì)停止工作,等待 Rebalance 過程完成。
Rebalance 過程
Rebalance 過程分為兩步:JoinGroup 請(qǐng)求和 SyncGroup 請(qǐng)求。JoinGroup :JoinGroup 請(qǐng)求的主要作用是將組成員訂閱信息發(fā)送給領(lǐng)導(dǎo)者消費(fèi)者,待領(lǐng)導(dǎo)者制定好分配方案后,重平衡流程進(jìn)入到 SyncGroup 請(qǐng)求階段。SyncGroup:SyncGroup 請(qǐng)求的主要目的,就是讓協(xié)調(diào)者把領(lǐng)導(dǎo)者制定的分配方案下發(fā)給各個(gè)組內(nèi)成員。當(dāng)所有成員都成功接收到分配方案后,消費(fèi)者組進(jìn)入到 Stable 狀態(tài),即開始正常的消費(fèi)工作。
15、Kafka 負(fù)載均衡會(huì)導(dǎo)致什么問題
在消費(fèi)者組Rebalance期間,一直等到rebalance結(jié)束前,消費(fèi)者會(huì)出現(xiàn)無法讀取消息,造成整個(gè)消費(fèi)者組一段時(shí)間內(nèi)不可用。
16、如何增強(qiáng)消費(fèi)者的消費(fèi)能力
1、如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)==分區(qū)數(shù)。兩者缺一不可。
2、如果是下游的數(shù)據(jù)處理不及時(shí):則提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時(shí)間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。
3、優(yōu)化消費(fèi)者的處理邏輯,提高處理效率
17、消費(fèi)者與Topic的分區(qū)策略
Range
Range是對(duì)每個(gè)Topic而言的(即一個(gè)Topic一個(gè)Topic分),首先對(duì)同一個(gè)Topic里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序。然后用Partitions分區(qū)的個(gè)數(shù)除以消費(fèi)者線程的總數(shù)來決定每個(gè)消費(fèi)者線程消費(fèi)幾個(gè)分區(qū)。如果除不盡,那么前面幾個(gè)消費(fèi)者線程將會(huì)多消費(fèi)一個(gè)分區(qū)。
RoundRobin
將消費(fèi)組內(nèi)所有消費(fèi)者以及消費(fèi)者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢方式逐個(gè)將分區(qū)以此分配給每個(gè)消費(fèi)者。使用RoundRobin策略有兩個(gè)前提條件必須滿足:
- 同一個(gè)消費(fèi)者組里面的所有消費(fèi)者的num.streams(消費(fèi)者消費(fèi)線程數(shù))必須相等;
- 每個(gè)消費(fèi)者訂閱的主題必須相同。
StickyAssignor
無論是RangeAssignor,還是RoundRobinAssignor,當(dāng)前的分區(qū)分配算法都沒有考慮上一次的分配結(jié)果。顯然,在執(zhí)行一次新的分配之前,如果能考慮到上一次分配的結(jié)果,盡量少的調(diào)整分區(qū)分配的變動(dòng),顯然是能節(jié)省很多開銷的。
Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對(duì)上一次分配做最少的變動(dòng)(上一次的結(jié)果是有粘性的),其目標(biāo)有兩點(diǎn):
- 分區(qū)的分配盡量的均衡
- 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致
StickyAssignor的模式比其他兩種提供更加均衡的分配結(jié)果,在發(fā)生Consumer或者Partition變更的情況下,也能減少不必要的分區(qū)調(diào)整。
18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)
Kafka精確一次性(Exactly-once)保障之一
冪等性:就是用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。
出現(xiàn)原因:
- 原因1:Consumer在消費(fèi)過程中,被強(qiáng)行kill掉消費(fèi)者線程或異常中斷(消費(fèi)系統(tǒng)宕機(jī)、重啟等),導(dǎo)致實(shí)際消費(fèi)后的數(shù)據(jù),offset沒有提交。
- 原因2:設(shè)置offset為自動(dòng)提交,關(guān)閉kafka時(shí),如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會(huì)重復(fù)消費(fèi)。
- 原因3:消費(fèi)超時(shí)導(dǎo)致消費(fèi)者與集群斷開連接,offset尚未提交,導(dǎo)致重平衡后重復(fù)消費(fèi)。一般消費(fèi)超時(shí)(session.time.out)有以下原因:并發(fā)過大,消費(fèi)者突然宕機(jī),處理超時(shí)等。
解決思路:
- 提高消費(fèi)能力,提高單條消息的處理速度,例如對(duì)消息處理中比 較耗時(shí)的步驟可通過異步的方式進(jìn)行處理、利用多線程處理等。
- 在縮短單條消息消費(fèi)時(shí)常的同時(shí),根據(jù)實(shí)際場(chǎng)景可將session.time.out(Consumer心跳超時(shí)時(shí)間)和max.poll.interval.ms(consumer兩次poll的最大時(shí)間間隔)值設(shè)置大一點(diǎn),避免不必要的rebalance,此外可適當(dāng)減小max.poll.records的值( 表示每次消費(fèi)的時(shí)候,獲取多少條消息),默認(rèn)值是500,可根據(jù)實(shí)際消息速率適當(dāng)調(diào)小。這種思路可解決因消費(fèi)時(shí)間過長(zhǎng)導(dǎo)致的重復(fù)消費(fèi)問題, 對(duì)代碼改動(dòng)較小,但無法絕對(duì)避免重復(fù)消費(fèi)問題。
- 根據(jù)業(yè)務(wù)情況制定:引入單獨(dú)去重機(jī)制,例如生成消息時(shí),在消息中加入唯一標(biāo)識(shí)符如主鍵id。寫入時(shí)根據(jù)逐漸主鍵判斷update還是insert。如果寫redis,則每次根據(jù)主鍵id進(jìn)行set即可,天然冪等性。或者使用redis作為緩沖,將id首先寫入redis進(jìn)行重復(fù)判斷,然后在進(jìn)行后續(xù)操作。
- 開啟生產(chǎn)者的精確一次性,也就是冪等性, 再引入producer事務(wù) ,即客戶端傳入一個(gè)全局唯一的Transaction ID,這樣即使本次會(huì)話掛掉也能根據(jù)這個(gè)id找到原來的事務(wù)狀態(tài)
19、為什么Kafka不支持讀寫分離
在 Kafka 中,生產(chǎn)者寫入消息、消費(fèi)者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實(shí)現(xiàn)的是一種主寫主讀的生產(chǎn)消費(fèi)模型。
Kafka 并不支持主寫從讀,因?yàn)橹鲗憦淖x有 2 個(gè)很明 顯的缺點(diǎn):
- 數(shù)據(jù)一致性問題。數(shù)據(jù)從主節(jié)點(diǎn)轉(zhuǎn)到從節(jié)點(diǎn)必然會(huì)有一個(gè)延時(shí)的時(shí)間窗口,這個(gè)時(shí)間 窗口會(huì)導(dǎo)致主從節(jié)點(diǎn)之間的數(shù)據(jù)不一致。某一時(shí)刻,在主節(jié)點(diǎn)和從節(jié)點(diǎn)中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點(diǎn)中 A 的值修改為 Y,那么在這個(gè)變更通知到從節(jié)點(diǎn)之前,應(yīng)用讀取從節(jié)點(diǎn)中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。
- 延時(shí)問題。類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點(diǎn)到同步至從節(jié)點(diǎn)中的過程需要經(jīng) 歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→網(wǎng)絡(luò)→從節(jié)點(diǎn)內(nèi)存這幾個(gè)階段,整個(gè)過程會(huì)耗費(fèi)一定的時(shí)間。而在 Kafka 中,主從同步會(huì)比 Redis 更加耗時(shí),它需要經(jīng)歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→主節(jié)點(diǎn)磁盤→網(wǎng)絡(luò)→從節(jié) 點(diǎn)內(nèi)存→從節(jié)點(diǎn)磁盤這幾個(gè)階段。對(duì)延時(shí)敏感的應(yīng)用而言,主寫從讀的功能并不太適用。
20、Kafka選舉機(jī)制
Kafka選舉主要分為以下三種:
- 控制器(Broker)選舉機(jī)制
- 分區(qū)副本選舉機(jī)制
- 消費(fèi)組選舉機(jī)制
控制器選舉
控制器是Kafka的核心組件,它的主要作用是在Zookeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群包括所有分區(qū)與副本的狀態(tài)。集群中任意一個(gè)Broker都能充當(dāng)控制器的角色,但在運(yùn)行過程中,只能有一個(gè)Broker成為控制器。集群中第一個(gè)啟動(dòng)的Broker會(huì)通過在Zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn)/controller來讓自己成為控制器,其他Broker啟動(dòng)時(shí)也會(huì)在zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn),但是發(fā)現(xiàn)節(jié)點(diǎn)已經(jīng)存在,所以它們會(huì)收到一個(gè)異常,意識(shí)到控制器已經(jīng)存在,那么就會(huì)在Zookeeper中創(chuàng)建watch對(duì)象,便于它們收到控制器變更的通知。如果控制器與Zookeeper斷開連接或異常退出,其他broker通過watch收到控制器變更的通知,就會(huì)嘗試創(chuàng)建臨時(shí)節(jié)點(diǎn)/controller,如果有一個(gè)Broker創(chuàng)建成功,那么其他broker就會(huì)收到創(chuàng)建異常通知,代表控制器已經(jīng)選舉成功,其他Broker只需創(chuàng)建watch對(duì)象即可。
控制器作用
- 主題管理:創(chuàng)建、刪除Topic,以及增加Topic分區(qū)等操作都是由控制器執(zhí)行。
- 分區(qū)重分配:執(zhí)行Kafka的reassign腳本對(duì)Topic分區(qū)重分配的操作,也是由控制器實(shí)現(xiàn)。如果集群中有一個(gè)Broker異常退出,控制器會(huì)檢查這個(gè)broker是否有分區(qū)的副本leader,如果有那么這個(gè)分區(qū)就需要一個(gè)新的leader,此時(shí)控制器就會(huì)去遍歷其他副本,決定哪一個(gè)成為新的leader,同時(shí)更新分區(qū)的ISR集合。如果有一個(gè)Broker加入集群中,那么控制器就會(huì)通過Broker ID去判斷新加入的Broker中是否含有現(xiàn)有分區(qū)的副本,如果有,就會(huì)從分區(qū)副本中去同步數(shù)據(jù)。
- Preferred leader選舉:因?yàn)樵贙afka集群長(zhǎng)時(shí)間運(yùn)行中,broker的宕機(jī)或崩潰是不可避免的,leader就會(huì)發(fā)生轉(zhuǎn)移,即使broker重新回來,也不會(huì)是leader了。在眾多l(xiāng)eader的轉(zhuǎn)移過程中,就會(huì)產(chǎn)生leader不均衡現(xiàn)象,可能一小部分broker上有大量的leader,影響了整個(gè)集群的性能,所以就需要把leader調(diào)整回最初的broker上,這就需要Preferred leader選舉。
- 集群成員管理:控制器能夠監(jiān)控新broker的增加,broker的主動(dòng)關(guān)閉與被動(dòng)宕機(jī),進(jìn)而做其他工作。這也是利用Zookeeper的ZNode模型和Watcher機(jī)制,控制器會(huì)監(jiān)聽Zookeeper中/brokers/ids下臨時(shí)節(jié)點(diǎn)的變化。同時(shí)對(duì)broker中的leader節(jié)點(diǎn)進(jìn)行調(diào)整。
- 元數(shù)據(jù)服務(wù):控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有broker會(huì)定期接收控制器發(fā)來的元數(shù)據(jù)更新請(qǐng)求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
分區(qū)副本選舉機(jī)制
發(fā)生副本選舉的情況:
- 創(chuàng)建主題
- 增加分區(qū)
- 分區(qū)下線(分區(qū)中原先的leader副本下線,此時(shí)分區(qū)需要選舉一個(gè)新的leader上線來對(duì)外提供服務(wù))
- 分區(qū)重分配
分區(qū)leader副本的選舉由Kafka控制器負(fù)責(zé)具體實(shí)施。主要過程如下:
- 從Zookeeper中讀取當(dāng)前分區(qū)的所有ISR(in-sync replicas)集合。
- 調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的leader。
分區(qū)副本分為ISR(同步副本)和OSR(非同步副本),當(dāng)leader發(fā)生故障時(shí),只有“同步副本”才可以被選舉為leader。選舉時(shí)按照集合中副本的順序查找第一個(gè)存活的副本,并且這個(gè)副本在ISR集合中。同時(shí)kafka支持OSR(非同步副本)也參加選舉,Kafka broker端提供了一個(gè)參數(shù)unclean.leader.election.enable,用于控制是否允許非同步副本參與leader選舉;如果開啟,則當(dāng) ISR為空時(shí)就會(huì)從這些副本中選舉新的leader,這個(gè)過程稱為 Unclean leader選舉??梢愿鶕?jù)實(shí)際的業(yè)務(wù)場(chǎng)景選擇是否開啟Unclean leader選舉。開啟 Unclean 領(lǐng)導(dǎo)者選舉可能會(huì)造成數(shù)據(jù)丟失,但好處是,它使得分區(qū) Leader 副本一直存在,不至于停止對(duì)外提供服務(wù),因此提升了高可用性。一般建議是關(guān)閉Unclean leader選舉,因?yàn)橥ǔ?shù)據(jù)的一致性要比可用性重要。
消費(fèi)組(Consumer Group)選主
在Kafka的消費(fèi)端,會(huì)有一個(gè)消費(fèi)者協(xié)調(diào)器以及消費(fèi)組,組協(xié)調(diào)器(Group Coordinator)需要為消費(fèi)組內(nèi)的消費(fèi)者選舉出一個(gè)消費(fèi)組的leader。如果消費(fèi)組內(nèi)還沒有l(wèi)eader,那么第一個(gè)加入消費(fèi)組的消費(fèi)者即為消費(fèi)組的leader,如果某一個(gè)時(shí)刻leader消費(fèi)者由于某些原因退出了消費(fèi)組,那么就會(huì)重新選舉leader,選舉源碼如下:
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
在組協(xié)調(diào)器中消費(fèi)者的信息是以HashMap的形式存儲(chǔ)的,其中key為消費(fèi)者的member_id,而value是消費(fèi)者相關(guān)的元數(shù)據(jù)信息。而leader的取值為HashMap中的第一個(gè)鍵值對(duì)的key(這種選舉方式等同于隨機(jī))。
消費(fèi)組的Leader和Coordinator沒有關(guān)聯(lián)。消費(fèi)組的leader負(fù)責(zé)Rebalance過程中消費(fèi)分配方案的制定。
21、腦裂問題
controller掛掉后,Kafka集群會(huì)重新選舉一個(gè)新的controller。這里面存在一個(gè)問題,很難確定之前的controller節(jié)點(diǎn)是掛掉還是只是短暫性的故障。如果之前掛掉的controller又正常了,他并不知道自己已經(jīng)被取代了,那么此時(shí)集群中會(huì)出現(xiàn)兩臺(tái)controller。
其實(shí)這種情況是很容易發(fā)生。比如,某個(gè)controller由于GC而被認(rèn)為已經(jīng)掛掉,并選擇了一個(gè)新的controller。在GC的情況下,在最初的controller眼中,并沒有改變?nèi)魏螙|西,該Broker甚至不知道它已經(jīng)暫停了。因此,它將繼續(xù)充當(dāng)當(dāng)前controller,這是分布式系統(tǒng)中的常見情況,稱為腦裂。
假如,處于活躍狀態(tài)的controller進(jìn)入了長(zhǎng)時(shí)間的GC暫停。它的ZooKeeper會(huì)話過期了,之前注冊(cè)的/controller節(jié)點(diǎn)被刪除。集群中其他Broker會(huì)收到zookeeper的這一通知。
由于集群中必須存在一個(gè)controller Broker,所以現(xiàn)在每個(gè)Broker都試圖嘗試成為新的controller。假設(shè)Broker 2速度比較快,成為了最新的controller Broker。此時(shí),每個(gè)Broker會(huì)收到Broker2成為新的controller的通知,由于Broker3正在進(jìn)行"stop the world"的GC,可能不會(huì)收到Broker2成為最新的controller的通知。
等到Broker3的GC完成之后,仍會(huì)認(rèn)為自己是集群的controller,在Broker3的眼中好像什么都沒有發(fā)生一樣。
現(xiàn)在,集群中出現(xiàn)了兩個(gè)controller,它們可能一起發(fā)出具有沖突的命令,就會(huì)出現(xiàn)腦裂的現(xiàn)象。如果對(duì)這種情況不加以處理,可能會(huì)導(dǎo)致嚴(yán)重的不一致。所以需要一種方法來區(qū)分誰是集群當(dāng)前最新的Controller。
Kafka是通過使用epoch number(紀(jì)元編號(hào),也稱為隔離令牌)來完成的。epoch number只是單調(diào)遞增的數(shù)字,第一次選出Controller時(shí),epoch number值為1,如果再次選出新的Controller,則epoch number將為2,依次單調(diào)遞增。
每個(gè)新選出的controller通過Zookeeper 的條件遞增操作獲得一個(gè)全新的、數(shù)值更大的epoch number 。其他Broker 在知道當(dāng)前epoch number 后,如果收到由controller發(fā)出的包含較舊(較小)epoch number的消息,就會(huì)忽略它們,即Broker根據(jù)最大的epoch number來區(qū)分當(dāng)前最新的controller。
上圖,Broker3向Broker1發(fā)出命令:讓Broker1上的某個(gè)分區(qū)副本成為leader,該消息的epoch number值為1。于此同時(shí),Broker2也向Broker1發(fā)送了相同的命令,不同的是,該消息的epoch number值為2,此時(shí)Broker1只聽從Broker2的命令(由于其epoch number較大),會(huì)忽略Broker3的命令,從而避免腦裂的發(fā)生。
22、如何為Kafka集群選擇合適的
Topics/Partitions數(shù)量
1、根據(jù)當(dāng)前topic的消費(fèi)者數(shù)量確認(rèn)
在kafka中,單個(gè)patition是kafka并行操作的最小單元。在producer和broker端,向每一個(gè)分區(qū)寫入數(shù)據(jù)是可以完全并行化的,此時(shí),可以通過加大硬件資源的利用率來提升系統(tǒng)的吞吐量,例如對(duì)數(shù)據(jù)進(jìn)行壓縮。在consumer段,kafka只允許單個(gè)partition的數(shù)據(jù)被一個(gè)consumer線程消費(fèi)。因此,在consumer端,每一個(gè)Consumer Group內(nèi)部的consumer并行度完全依賴于被消費(fèi)的分區(qū)數(shù)量。綜上所述,通常情況下,在一個(gè)Kafka集群中,partition的數(shù)量越多,意味著可以到達(dá)的吞吐量越大。
2、根據(jù)consumer端的最大吞吐量確定
我們可以粗略地通過吞吐量來計(jì)算kafka集群的分區(qū)數(shù)量。假設(shè)對(duì)于單個(gè)partition,producer端的可達(dá)吞吐量為p,Consumer端的可達(dá)吞吐量為c,期望的目標(biāo)吞吐量為t,那么集群所需要的partition數(shù)量至少為max(t/p,t/c)。在producer端,單個(gè)分區(qū)的吞吐量大小會(huì)受到批量大小、數(shù)據(jù)壓縮方法、 確認(rèn)類型(同步/異步)、復(fù)制因子等配置參數(shù)的影響。經(jīng)過測(cè)試,在producer端,單個(gè)partition的吞吐量通常是在10MB/s左右。在consumer端,單個(gè)partition的吞吐量依賴于consumer端每個(gè)消息的應(yīng)用邏輯處理速度。因此,我們需要對(duì)consumer端的吞吐量進(jìn)行測(cè)量。
23、Kafka 分區(qū)數(shù)可以增加或減少嗎,為什么
kafka支持分區(qū)數(shù)增加
例如我們可以使用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令將原本分區(qū)數(shù)為1得topic-name設(shè)置為3。當(dāng)主題中的消息包含有key時(shí)(即key不為null),根據(jù)key來計(jì)算分區(qū)的行為就會(huì)有所影響。當(dāng)topic-config的分區(qū)數(shù)為1時(shí),不管消息的key為何值,消息都會(huì)發(fā)往這一個(gè)分區(qū)中;當(dāng)分區(qū)數(shù)增加到3時(shí),那么就會(huì)根據(jù)消息的key來計(jì)算分區(qū)號(hào),原本發(fā)往分區(qū)0的消息現(xiàn)在有可能會(huì)發(fā)往分區(qū)1或者分區(qū)2中。如此還會(huì)影響既定消息的順序,所以在增加分區(qū)數(shù)時(shí)一定要三思而后行。對(duì)于基于key計(jì)算的主題而言,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對(duì)其進(jìn)行調(diào)整。
Kafka 不支持減少分區(qū)數(shù)。
按照Kafka現(xiàn)有的代碼邏輯而言,此功能完全可以實(shí)現(xiàn),不過也會(huì)使得代碼的復(fù)雜度急劇增大。實(shí)現(xiàn)此功能需要考慮的因素很多,比如刪除掉的分區(qū)中的消息該作何處理?如果隨著分區(qū)一起消失則消息的可靠性得不到保障;如果需要保留則又需要考慮如何保留。直接存儲(chǔ)到現(xiàn)有分區(qū)的尾部,消息的時(shí)間戳就不會(huì)遞增,如此對(duì)于Spark、Flink這類需要消息時(shí)間戳(事件時(shí)間)的組件將會(huì)受到影響;如果分散插入到現(xiàn)有的分區(qū)中,那么在消息量很大的時(shí)候,內(nèi)部的數(shù)據(jù)復(fù)制會(huì)占用很大的資源,而且在復(fù)制期間,此主題的可用性又如何得到保障?與此同時(shí),順序性問題、事務(wù)性問題、以及分區(qū)和副本的狀態(tài)機(jī)切換問題都是不得不面對(duì)的。反觀這個(gè)功能的收益點(diǎn)卻是很低,如果真的需要實(shí)現(xiàn)此類的功能,完全可以重新創(chuàng)建一個(gè)分區(qū)數(shù)較小的主題,然后將現(xiàn)有主題中的消息按照既定的邏輯復(fù)制過去即可。
24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解
Kafka精確一次性(Exactly-once)保障之一
生產(chǎn)者冪等性主要避免生產(chǎn)者數(shù)據(jù)重復(fù)提交至Kafka broker中并落盤。在正常情況下,Producer向Broker發(fā)送消息,Broker將消息追加寫到對(duì)應(yīng)的流(即某一Topic的某一Partition)中并落盤,并向Producer返回ACK信號(hào),表示確認(rèn)收到。但是Producer和Broker之間的通信總有可能出現(xiàn)異常,如果消息已經(jīng)寫入,但ACK在半途丟失了,Producer就會(huì)進(jìn)行retry操作再次發(fā)送該消息,造成重復(fù)寫入。
為了實(shí)現(xiàn)Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。
- PID。每個(gè)新的Producer在初始化的時(shí)候會(huì)被分配一個(gè)唯一的PID,這個(gè)PID對(duì)用戶是不可見的。
- Sequence Numbler。對(duì)于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)都對(duì)應(yīng)一個(gè)從0開始單調(diào)遞增的Sequence Number
- Broker端在緩存中保存了這seq number,對(duì)于接收的每條消息,如果其序號(hào)比Broker緩存中序號(hào)大于1則接受它,否則將其丟棄,這樣就可以實(shí)現(xiàn)了消息重復(fù)提交了.但是只能保證單個(gè)Producer對(duì)于同一個(gè)的Exactly Once語義
Producer使用冪等性的示例非常簡(jiǎn)單,與正常情況下Producer使用相比變化不大,只需要 把Producer的配置enable.idempotence設(shè)置為true即可,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//當(dāng)enable.idempotence為true時(shí)acks默認(rèn)為 all
// props.put("acks", "all");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
Prodcuer 冪等性對(duì)外保留的接口非常簡(jiǎn)單,其底層的實(shí)現(xiàn)對(duì)上層應(yīng)用做了很好的封裝,應(yīng)用層并不需要去關(guān)心具體的實(shí)現(xiàn)細(xì)節(jié),對(duì)用戶非常友好
Kafka的冪等性實(shí)現(xiàn)了對(duì)于單個(gè)Producer會(huì)話、單個(gè)TopicPartition級(jí)別的不重不漏,也就是最細(xì)粒度的保證。如果Producer重啟(PID發(fā)生變化),或者寫入是跨Topic、跨Partition的,單純的冪等性就會(huì)失效,需要更高級(jí)別的事務(wù)性來解決了。當(dāng)然事務(wù)性的原理更加復(fù)雜
25、談?wù)勀銓?duì) Kafka事務(wù)的了解
冪等性可以保證單個(gè)Producer會(huì)話、單個(gè)TopicPartition、單個(gè)會(huì)話session的不重不漏,如果Producer重啟,或者是寫入跨Topic、跨Partition的消息,冪等性無法保證。此時(shí)需要用到Kafka事務(wù)。Kafka 的事務(wù)處理,主要是允許應(yīng)用可以把消費(fèi)和生產(chǎn)的 batch 處理(涉及多個(gè) Partition)在一個(gè)原子單元內(nèi)完成,操作要么全部完成、要么全部失敗。為了實(shí)現(xiàn)這種機(jī)制,我們需要應(yīng)用能提供一個(gè)唯一 id,即使故障恢復(fù)后也不會(huì)改變,這個(gè) id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟內(nèi)部的 PID 1:1 分配,它們不同的是 txn.id 是用戶提供的,而 PID 是 Producer 內(nèi)部自動(dòng)生成的(并且故障恢復(fù)后這個(gè) PID 會(huì)變化),有了 txn.id 這個(gè)機(jī)制,就可以實(shí)現(xiàn)多 partition、跨會(huì)話的 EOS 語義。當(dāng)用戶使用 Kafka 的事務(wù)性時(shí),Kafka 可以做到的保證:
- 跨會(huì)話的冪等性寫入:即使中間故障,恢復(fù)后依然可以保持冪等性;
- 跨會(huì)話的事務(wù)恢復(fù):如果一個(gè)應(yīng)用實(shí)例掛了,啟動(dòng)的下一個(gè)實(shí)例依然可以保證上一個(gè)事務(wù)完成(commit 或者 abort);
- 跨多個(gè) Topic-Partition 的冪等性寫入,Kafka 可以保證跨多個(gè) Topic-Partition 的數(shù)據(jù)要么全部寫入成功,要么全部失敗,不會(huì)出現(xiàn)中間狀態(tài)。
事務(wù)性示例
Kafka 事務(wù)性的使用方法也非常簡(jiǎn)單,用戶只需要在 Producer 的配置中配置 transactional.id,通過 initTransactions() 初始化事務(wù)狀態(tài)信息,再通過 beginTransaction() 標(biāo)識(shí)一個(gè)事務(wù)的開始,然后通過 commitTransaction() 或 abortTransaction() 對(duì)事務(wù)進(jìn)行 commit 或 abort,示例如下所示:生產(chǎn)者:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {String msg = "matt test";producer.beginTransaction();producer.send(new ProducerRecord(topic, "0", msg.toString()));producer.send(new ProducerRecord(topic, "1", msg.toString()));producer.send(new ProducerRecord(topic, "2", msg.toString()));producer.commitTransaction();
} catch (ProducerFencedException e1) {e1.printStackTrace();producer.close();
} catch (KafkaException e2) {e2.printStackTrace();producer.abortTransaction();
}
producer.close();
消費(fèi)者:消費(fèi)者應(yīng)該設(shè)置提交事務(wù)的隔離級(jí)別
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
Kafka中只有兩種事務(wù)隔離級(jí)別:readcommitted、readuncommitted 設(shè)置為readcommitted時(shí)候是生產(chǎn)者事務(wù)已提交的數(shù)據(jù)才能讀取到。在執(zhí)行 commitTransaction() 或 abortTransaction() 方法前,設(shè)置為“readcommitted”的消費(fèi)端應(yīng)用是消費(fèi)不到這些消息的,不過在 KafkaConsumer 內(nèi)部會(huì)緩存這些消息,直到生產(chǎn)者執(zhí)行 commitTransaction() 方法之后它才能將這些消息推送給消費(fèi)端應(yīng)用。同時(shí)KafkaConsumer會(huì)根據(jù)分區(qū)對(duì)數(shù)據(jù)進(jìn)行整合,推送時(shí)按照分區(qū)順序進(jìn)行推送。而不是按照數(shù)據(jù)發(fā)送順序。反之,如果生產(chǎn)者執(zhí)行了 abortTransaction() 方法,那么 KafkaConsumer 會(huì)將這些緩存的消息丟棄而不推送給消費(fèi)端應(yīng)用。設(shè)置為read_uncommitted時(shí)候可以讀取到未提交的數(shù)據(jù)(報(bào)錯(cuò)終止前的數(shù)據(jù))
26、Kafka消息是采用Pull模式,還是Push模式
push模式下,消費(fèi)者速率主要由生產(chǎn)者決定,當(dāng)消息生產(chǎn)速率遠(yuǎn)大于消費(fèi)速率,消費(fèi)者容易崩潰,如果為了避免consumer崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費(fèi)。Pull模式可以根據(jù)自己的消費(fèi)能力拉取數(shù)據(jù)。Push模式必須在不知道下游consumer消費(fèi)能力和消費(fèi)策略的情況下決定是立即推送每條消息還是緩存之后批量推送。Pull有個(gè)缺點(diǎn)是,如果broker沒有可供消費(fèi)的消息,將導(dǎo)致consumer不斷輪詢。但是可以在消費(fèi)者設(shè)置輪詢間隔。
27、Kafka缺點(diǎn)
- 由于是批量發(fā)送,數(shù)據(jù)并非真正的實(shí)時(shí);
- 對(duì)于mqtt協(xié)議不支持;
- 不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入;
- 僅支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序;
- 監(jiān)控不完善,需要安裝插件;
- 依賴zookeeper進(jìn)行元數(shù)據(jù)管理;3.0版本去除
28、Kafka什么時(shí)候會(huì)丟數(shù)據(jù)
broker端消費(fèi)丟失
broker端的消息不丟失,其實(shí)就是用partition副本機(jī)制來保證。
- unclean.leader.election為true,且選舉出的首領(lǐng)分區(qū)為OSR時(shí) 可能就會(huì)發(fā)生消息丟失
- min.insync.replicas為N,則至少要存在N個(gè)同步副本才能向分區(qū)寫入數(shù)據(jù)。如果同步副本數(shù)量小于N時(shí)broker就會(huì)停止接收所有生產(chǎn)者的消息、生產(chǎn)者會(huì)出現(xiàn)異常,如果無法正確處理異常,則消息丟失。此時(shí)消費(fèi)者仍然可以讀取已有數(shù)據(jù)、變成只讀狀態(tài)。如果Topic只有一個(gè)同步副本,那么在這個(gè)副本變?yōu)椴豢捎脮r(shí),數(shù)據(jù)就可能會(huì)丟失。
- kafka的數(shù)據(jù)一開始是存儲(chǔ)在PageCache并定期flush到磁盤上的,如果出現(xiàn)斷電或者機(jī)器故障等,PageCache上的數(shù)據(jù)就丟失了。
生產(chǎn)者端
- ack有3種狀態(tài)保證消息被安全生產(chǎn) ack=0,消息傳輸?shù)紹roker端沒收到Broker的反饋即發(fā)送下一條,網(wǎng)絡(luò)故障導(dǎo)致小東西丟失。ack=1,如果剛好leader partition掛了,數(shù)據(jù)就會(huì)丟失。ack=all,min.insync.replicas如果小于N或者Topic只有一個(gè)同步副本。
- 消息重試機(jī)制未開啟。
- 當(dāng)前消息過大,超過max.request.size大小,默認(rèn)為1MB
- 生產(chǎn)者速率超過消費(fèi)者,緩存池空間占滿后,生產(chǎn)線程阻塞超過最大時(shí)間,此時(shí)生產(chǎn)者會(huì)拋出異常,如果沒有處理好則會(huì)丟失數(shù)據(jù)。
消費(fèi)者端
enable.auto.commit=true,消費(fèi)在處理之前提交了offset,則處理異??赡軙?huì)造成消息的丟失。enable.auto.commit=false,Consumer手動(dòng)批量提交位點(diǎn),在批量位點(diǎn)中某個(gè)位點(diǎn)數(shù)據(jù)異常時(shí),沒有正確處理異常,而是將批量位點(diǎn)的最后一個(gè)位點(diǎn)提交,導(dǎo)致異常數(shù)據(jù)丟失
29、Kafka分區(qū)數(shù)越多越好嗎
并非分區(qū)數(shù)量越多,效率越高
- Topic 每個(gè) partition 在 Kafka 路徑下都有一個(gè)自己的目錄,該目錄下有兩個(gè)主要的文件:base_offset.log 和 base_offset.index。Kafka 服務(wù)端的 ReplicaManager 會(huì)為每個(gè) Broker 節(jié)點(diǎn)保存每個(gè)分區(qū)的這兩個(gè)文件的文件句柄。所以如果分區(qū)過多,ReplicaManager 需要保持打開狀態(tài)的文件句柄數(shù)也就會(huì)很多。
- 每個(gè) Producer, Consumer 進(jìn)程都會(huì)為分區(qū)緩存消息,如果分區(qū)過多,緩存的消息越多,占用的內(nèi)存就越大;
- n 個(gè)分區(qū)有 1 個(gè) Leader,(n-1) 個(gè) Follower,如果運(yùn)行過程中 Leader 掛了,則會(huì)從剩余 (n-1) 個(gè) Followers 中選舉新 Leader;如果有成千上萬個(gè)分區(qū),那么需要很長(zhǎng)時(shí)間的選舉,消耗較大的性能。
30、Kafka如何保證消息的有序性
單分區(qū)
Kafka在特定條件下可以保障單分區(qū)消息的有序性
kafka在發(fā)送消息過程中,正常情況下是有序的,如果消息出現(xiàn)重試,則會(huì)造成消息亂序。導(dǎo)致亂序的原因是:max.in.flight.requests.per.connection默認(rèn)值為5。
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前,請(qǐng)求隊(duì)列中可以提交多少個(gè)請(qǐng)求,用于提高網(wǎng)絡(luò)吞吐量。
圖中,batch1-5在請(qǐng)求隊(duì)列中,batch1作為最新數(shù)據(jù)進(jìn)行提交,提交失敗后如果開啟重試機(jī)制,則batch1會(huì)重新添加到本地緩沖池的頭部,然后提交至請(qǐng)求隊(duì)列中重新發(fā)送。此時(shí)batch1的順序會(huì)排在batch5之后,發(fā)生了亂序。
解決方式是將max.in.flight.requests.per.connection設(shè)置為1,消息隊(duì)列中只允許有一個(gè)請(qǐng)求,這樣消息失敗后,可以第一時(shí)間發(fā)送,不會(huì)產(chǎn)生亂序,但是會(huì)降低網(wǎng)絡(luò)吞吐量。
或者開啟生產(chǎn)者冪等性設(shè)置,開啟后,該P(yáng)roducer發(fā)送的消息都對(duì)應(yīng)一個(gè)單調(diào)增的Sequence Number。同樣的Broker端也會(huì)為每個(gè)生產(chǎn)者的每條消息維護(hù)一個(gè)序號(hào),并且每commit一條數(shù)據(jù)時(shí)就會(huì)將其序號(hào)遞增。對(duì)于接收到的數(shù)據(jù),如果其序號(hào)比Borker維護(hù)的序號(hào)大一(即表示是下一條數(shù)據(jù)),Broker會(huì)接收它,否則將其丟棄。如果消息序號(hào)比Broker維護(hù)的序號(hào)差值比一大,說明中間有數(shù)據(jù)尚未寫入,即亂序,此時(shí)Broker拒絕該消息,Producer拋出InvalidSequenceNumber 如果消息序號(hào)小于等于Broker維護(hù)的序號(hào),說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber Sender發(fā)送失敗后會(huì)重試,這樣可以保證每個(gè)消息都被發(fā)送到broker
多分區(qū)
Kafka本身無法保障多分區(qū)的有序性,可以通過業(yè)務(wù)設(shè)計(jì)進(jìn)行保證,例如需要單表數(shù)據(jù)通過自定義partition的方式發(fā)送至同一個(gè)分區(qū)
31、Kafka精確一次性(Exactly-once)如何保證
宏觀上:可靠性 + at least once + 冪等性
具體實(shí)現(xiàn):Kafka不丟消息-生產(chǎn)者冪等性-消費(fèi)者冪等性
詳見目錄:
12、Kafka可靠性保證(不丟消息)
18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)
24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解?