国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

做測(cè)試題的網(wǎng)站關(guān)鍵詞優(yōu)化的最佳方法

做測(cè)試題的網(wǎng)站,關(guān)鍵詞優(yōu)化的最佳方法,礦業(yè)公司網(wǎng)站源碼,飾品網(wǎng)站設(shè)計(jì)方案1、 kafka 是什么,有什么作用 2、Kafka為什么這么快 3、Kafka架構(gòu)及名詞解釋 4、Kafka中的AR、ISR、OSR代表什么 5、HW、LEO代表什么 6、ISR收縮性 7、kafka follower如何與leader同步數(shù)據(jù) 8、Zookeeper 在 Kafka 中的作用(早期) 9、Kafka如何快…

1、 kafka 是什么,有什么作用

2、Kafka為什么這么快

3、Kafka架構(gòu)及名詞解釋

4、Kafka中的AR、ISR、OSR代表什么

5、HW、LEO代表什么

6、ISR收縮性

7、kafka follower如何與leader同步數(shù)據(jù)

8、Zookeeper 在 Kafka 中的作用(早期)

9、Kafka如何快速讀取指定offset的消息

10、生產(chǎn)者發(fā)送消息有哪些模式

11、發(fā)送消息的分區(qū)策略有哪些

12、Kafka可靠性保證(不丟消息)

13、Kafka 是怎么去實(shí)現(xiàn)負(fù)載均衡的

14、簡(jiǎn)述Kafka的Rebalance機(jī)制

15、Kafka 負(fù)載均衡會(huì)導(dǎo)致什么問題

16、如何增強(qiáng)消費(fèi)者的消費(fèi)能力

17、消費(fèi)者與Topic的分區(qū)策略

18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)

19、為什么Kafka不支持讀寫分離

20、Kafka選舉機(jī)制

21、腦裂問題

22、如何為Kafka集群選擇合適的Topics/Partitions數(shù)量

23、Kafka 分區(qū)數(shù)可以增加或減少嗎?為什么

24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解

25、談?wù)勀銓?duì) Kafka事務(wù)的了解?

26、Kafka消息是采用Pull模式,還是Push模式?

27、Kafka缺點(diǎn)

28、Kafka什么時(shí)候會(huì)丟數(shù)據(jù)

29、Kafka分區(qū)數(shù)越多越好嗎?

30、Kafka如何保證消息的有序性

31、Kafka精確一次性(Exactly-once)如何保證

1、 kafka 是什么,有什么作用

Kafka是一個(gè)開源的高吞吐量的分布式消息中間件,對(duì)比于其他 1) 緩沖和削峰:上游數(shù)據(jù)時(shí)有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個(gè)緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。

1) 解耦和擴(kuò)展性:項(xiàng)目開始的時(shí)候,并不能確定具體需求。消息隊(duì)列可以作為一個(gè)接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對(duì)數(shù)據(jù)編程即可獲取擴(kuò)展能力。

1) 冗余:可以采用一對(duì)多的方式,一個(gè)生產(chǎn)者發(fā)布消息,可以被多個(gè)訂閱topic的服務(wù)消費(fèi)到,供多個(gè)毫無關(guān)聯(lián)的業(yè)務(wù)使用。

1) 健壯性:消息隊(duì)列可以堆積請(qǐng)求,所以消費(fèi)端業(yè)務(wù)即使短時(shí)間死掉,也不會(huì)影響主要業(yè)務(wù)的正常進(jìn)行。

1) 異步通信:很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

2、Kafka為什么這么快

  • 利用 Partition 實(shí)現(xiàn)并行處理 不同 Partition 可位于不同機(jī)器,因此可以充分利用集群優(yōu)勢(shì),實(shí)現(xiàn)機(jī)器間的并行處理。另一方面,由于 Partition 在物理上對(duì)應(yīng)一個(gè)文件夾,即使多個(gè) Partition 位于同一個(gè)節(jié)點(diǎn),也可通過配置讓同一節(jié)點(diǎn)上的不同 Partition 置于不同的磁盤上,從而實(shí)現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢(shì)。
  • 利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ) Page Cache 來利用內(nèi)存提高 I/O 效率
  • 順序?qū)?kafka的消息是不斷追加到文件中的,這個(gè)特性使kafka可以充分利用磁盤的順序讀寫性能 由于現(xiàn)代的操作系統(tǒng)提供了預(yù)讀和寫技術(shù),磁盤的順序?qū)懘蠖鄶?shù)情況下比隨機(jī)寫內(nèi)存還要快。順序讀寫不需要硬盤磁頭的尋道時(shí)間,只需很少的扇區(qū)旋轉(zhuǎn)時(shí)間,所以速度遠(yuǎn)快于隨機(jī)讀寫
  • Zero-copy 零拷技術(shù)減少拷貝次數(shù)
  • 數(shù)據(jù)批量處理。合并小的請(qǐng)求,然后以流的方式進(jìn)行交互,直頂網(wǎng)絡(luò)上限。在很多情況下,系統(tǒng)的瓶頸不是 CPU 或磁盤,而是網(wǎng)絡(luò)IO。因此,除了操作系統(tǒng)提供的低級(jí)批處理之外,Kafka 的客戶端和 broker 還會(huì)在通過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)之前,在一個(gè)批處理中累積多條記錄 (包括讀和寫)。記錄的批處理分?jǐn)偭司W(wǎng)絡(luò)往返的開銷,使用了更大的數(shù)據(jù)包從而提高了帶寬利用率。
  • Pull 拉模式 使用拉模式進(jìn)行消息的獲取消費(fèi),與消費(fèi)端處理能力相符。
  • 數(shù)據(jù)壓縮 Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer可以通過GZIP、Snappy、LZ4格式對(duì)消息集合進(jìn)行壓縮,數(shù)據(jù)壓縮一般都是和批處理配套使用來作為優(yōu)化手段的。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫?Producer壓縮之后,在Consumer需進(jìn)行解壓,雖然增加了CPU的工作,但在對(duì)大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是CPU,所以這個(gè)成本很值得

3、Kafka架構(gòu)及名詞解釋

簡(jiǎn)易架構(gòu)圖如下:

詳細(xì)架構(gòu)圖如下

  • Broker :一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。
  • Producer:消息生產(chǎn)者,向kafka broker發(fā)送消息的客戶端。
  • Consumer:消息消費(fèi)者,向kafka broker取消息的客戶端。
  • Topic:隊(duì)列,生產(chǎn)者和消費(fèi)者通過此進(jìn)行對(duì)接。
  • Consumer Group (CG):若干個(gè)Consumer組成的集合。這是kafka用來實(shí)現(xiàn)一個(gè)topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個(gè)consumer)的手段。一個(gè)topic可以有多個(gè)CG。topic的消息會(huì)復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)CG只會(huì)把消息發(fā)給該CG中的一個(gè)consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)consumer有一個(gè)獨(dú)立的CG就可以了。要實(shí)現(xiàn)單播只要所有的consumer在同一個(gè)CG。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic。
  • Partition:分區(qū),為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)topic可以分布在多個(gè)broker上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition都是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。kafka只保證同一個(gè)partition中的消息順序,不保證一個(gè)topic的整體(多個(gè)partition之間)的順序。生產(chǎn)者和消費(fèi)者使用時(shí)可以指定topic中的具體partition。
  • 副本:在kafka中,每個(gè)主題可以有多個(gè)分區(qū),每個(gè)分區(qū)又可以有多個(gè)副本。這多個(gè)副本中,只有一個(gè)是leader,而其他的都是follower副本。僅有l(wèi)eader副本可以對(duì)外提供服務(wù)。多個(gè)follower副本通常存放在和leader副本不同的broker中。通過這樣的機(jī)制實(shí)現(xiàn)了高可用,當(dāng)某臺(tái)機(jī)器掛掉后,其他follower副本也能迅速”轉(zhuǎn)正“,開始對(duì)外提供服務(wù)。
  • offset:消費(fèi)偏移量,topic中的每個(gè)分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的log文件。分區(qū)中的每一個(gè)記錄都會(huì)分配一個(gè)id號(hào)來表示順序,我們稱之為offset,offset用來唯一的標(biāo)識(shí)分區(qū)中每一條記錄。可以設(shè)置為“自動(dòng)提交”與“手動(dòng)提交”。

4、Kafka中的AR、ISR、OSR代表什么

  • AR:Assigned Replicas 指當(dāng)前分區(qū)中的所有副本。
  • ISR:In-Sync Replicas 副本同步隊(duì)列。ISR中包括Leader和Foller。如果Leader進(jìn)程掛掉,會(huì)在ISR隊(duì)列中選擇一個(gè)服務(wù)作為新的Leader。有replica.lag.max.message(延遲條數(shù))和replica.lag.time.max.ms(延遲時(shí)間)兩個(gè)參數(shù)決定一臺(tái)服務(wù)器是否可以加入ISR副本隊(duì)列,在0.10版本之后移除了replica.lag.max.message(延遲條數(shù))參數(shù),防治服務(wù)頻繁的進(jìn)出隊(duì)列。任意一個(gè)維度超過閾值都會(huì)把Follower踢出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會(huì)先存放在OSR中。
  • OSR:(Out-of-Sync Replicas)非同步副本隊(duì)列。與leader副本同步滯后過多的副本(不包括leader副本)組成OSR。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本會(huì)把它從OSR集合轉(zhuǎn)移至ISR集合。默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時(shí),只有在ISR集合中的副本才有資格被選舉為新的leader,而在OSR集合中的副本則沒有任何機(jī)會(huì)(不過這個(gè)原則也可以通過修改unclean.leader.election.enable參數(shù)配置來改變)。unclean.leader.election.enable 為true的話,意味著非ISR集合的broker 也可以參與選舉,這樣就有可能發(fā)生數(shù)據(jù)丟失和數(shù)據(jù)不一致的情況,Kafka的可靠性就會(huì)降低;而如果unclean.leader.election.enable參數(shù)設(shè)置為false,Kafka的可用性就會(huì)降低。

ISR的伸縮:1)Leader跟蹤維護(hù)ISR中follower滯后狀態(tài),落后太多或失效時(shí),leade把他們從ISR剔除。2)OSR中follower“追上”Leader,在ISR中才有資格選舉leader。

5、HW、LEO代表什么

  • LEO (Log End Offset),標(biāo)識(shí)當(dāng)前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1.分區(qū) ISR 集合中的每個(gè)副本都會(huì)維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對(duì)消費(fèi)者而言只能消費(fèi) HW 之前的消息。
  • HW:replica高水印值,副本中最新一條已提交消息的位移。leader 的HW值也就是實(shí)際已提交消息的范圍,每個(gè)replica都有HW值,但僅僅leader中的HW才能作為標(biāo)示信息。什么意思呢,就是說當(dāng)按照參數(shù)標(biāo)準(zhǔn)成功完成消息備份(成功同步給follower replica后)才會(huì)更新HW的值,代表消息理論上已經(jīng)不會(huì)丟失,可以認(rèn)為“已提交”。

6、ISR收縮性

啟動(dòng) Kafka時(shí)候自動(dòng)開啟的兩個(gè)定時(shí)任務(wù),“isr-expiration"和”isr-change-propagation"。

  • isr-expiration:isr-expiration任務(wù)會(huì)周期性的檢測(cè)每個(gè)分區(qū)是否需要縮減其ISR集合,相當(dāng)于一個(gè)紀(jì)檢委員,巡查尖子班時(shí)候發(fā)現(xiàn)有學(xué)生睡覺打牌看小說,就把它的座位移除尖子班,縮減ISR,寧缺毋濫。同樣道理,如果follower數(shù)據(jù)同步趕上leader,那么該follower就能進(jìn)入ISR尖子班,擴(kuò)充。上面關(guān)于ISR尖子班人員的所見,都會(huì)記錄到isrChangeSet中,想象成是一個(gè)名單列表,誰能進(jìn),誰要出,都記錄在案。
  • isr-change-propagation:作用就是檢查isrChangeSet,按照名單上的信息移除和遷入,一般是2500ms檢查一次,但是為了防止頻繁收縮擴(kuò)充影響性能,不是每次都能做變動(dòng),必須滿足:1、上一次ISR集合發(fā)生變化距離現(xiàn)在已經(jīng)超過5秒,2、上一次寫入zookeeper的時(shí)候距離現(xiàn)在已經(jīng)超過60秒。這兩個(gè)條件都滿足,那么就開始換座位!這兩個(gè)條件可以由我們來配置。
  • Kafka使用這種ISR收縮的方式有效的權(quán)衡了數(shù)據(jù)可靠性與性能之間的關(guān)系。

7、kafka follower如何與leader同步數(shù)據(jù)

Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。完全同步復(fù)制要求All Alive Follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,Follower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下,如果leader掛掉,會(huì)丟失數(shù)據(jù),kafka使用ISR的方式很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),而且Leader充分利用磁盤順序讀以及send file(zero copy)機(jī)制,這樣極大的提高復(fù)制性能,內(nèi)部批量寫磁盤,大幅減少了Follower與Leader的消息量差。

8、Zookeeper 在 Kafka 中的作用(早期)

zookeeper 是一個(gè)分布式的協(xié)調(diào)組件,早期版本的kafka用zk做meta信息存儲(chǔ),consumer的消費(fèi)狀態(tài),group的管理以及 offset的值。考慮到zk本身的一些因素以及整個(gè)架構(gòu)較大概率存在單點(diǎn)問題,新版本中逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對(duì)zookeeper的依賴,

但是broker依然依賴于ZK,zookeeper 在kafka中還用來選舉controller 和 檢測(cè)broker是否存活等等。

1. Broker注冊(cè):Broker是分布式部署并且互相獨(dú)立,此時(shí)需要有一個(gè)注冊(cè)系統(tǒng)能夠?qū)⒄麄€(gè)集群中的Broker管理起來,此時(shí)就用到的Zookeeper。在Zookeeper上會(huì)有一個(gè)專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):/brokes/ids

2.Topic注冊(cè):在kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息以及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper維護(hù),由專門的節(jié)點(diǎn)記錄:/brokers/topics

3.消費(fèi)者注冊(cè):消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下:注冊(cè)到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumer/[groupid]/ids/[consumerid],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫入該臨時(shí)節(jié)點(diǎn)。

  • 對(duì)消費(fèi)者分組中的消費(fèi)者的變化注冊(cè)監(jiān)聽:每個(gè)消費(fèi)者都需要關(guān)注所屬消費(fèi)者分組中的其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumer/[group_id]/ids節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。
  • 對(duì)Broker服務(wù)器變化注冊(cè)監(jiān)聽:消費(fèi)者需要對(duì)/broker/ids[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。
  • 進(jìn)行消費(fèi)者負(fù)載均衡:為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè)消費(fèi)者消費(fèi)而進(jìn)行消費(fèi)者與消息分區(qū)分配的過程,通常對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)進(jìn)行消費(fèi)者負(fù)載均衡。
  • Offset記錄 在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消費(fèi)的過程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便對(duì)該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:/consumers/[groupid]/offsets/[topic]/[brokerid-partition_id] 節(jié)點(diǎn)內(nèi)容就是Offset的值。

4.生產(chǎn)者負(fù)載均衡:由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。

  1. 四層負(fù)載均衡:根據(jù)生產(chǎn)者的IP地址和端口來為其圈定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)送到該Broker。這種方式邏輯簡(jiǎn)單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP鏈接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無法實(shí)時(shí)感知到Broker的新增和刪除。
  2. 使用Zookeeper進(jìn)行負(fù)載均衡,由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊(cè)過程,生產(chǎn)者會(huì)通過該節(jié)點(diǎn)的變化來動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。

5.消費(fèi)者負(fù)載均衡:與生產(chǎn)者相似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。

6.分區(qū)與消費(fèi)者的關(guān)系:消費(fèi)組consumer group下有多個(gè)Consumer(消費(fèi)者)。對(duì)于每個(gè)消費(fèi)者組(consumer group),Kafka都會(huì)為其分配一個(gè)全局唯一的Group ID,Group內(nèi)部的所有消費(fèi)者共享該ID。訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè)group下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group) 同時(shí),kafka為每個(gè)消費(fèi)者分配一個(gè)Consumer ID,通常采用“Hostname:UUID”形式表示。在kafka中,規(guī)定了每個(gè)消息分區(qū)只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在zookeeper上記錄消息分區(qū)與Consumer之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消費(fèi)分區(qū)的消費(fèi)權(quán)利,需要將其Consumer ID寫入到平Zookeeper對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:/consumers/[groupid]/owners/topic/[brokerid-partitionid] 其中,[brokerid-partition_id]就是一個(gè)消息分區(qū)的表示,節(jié)點(diǎn)內(nèi)容就是該消息分區(qū)上消費(fèi)者的Consumer ID。

7.補(bǔ)充:早期版本的 kafka 用 zk 做 meta 信息存儲(chǔ),consumer 的消費(fèi)狀態(tài),group 的管理以及 offse t的值。考慮到zk本身的一些因素以及整個(gè)架構(gòu)較大概率存在單點(diǎn)問題,新版本中確實(shí)逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對(duì)zookeeper的依賴

9、Kafka如何快速讀取指定offset的消息

Kafka本地日志存儲(chǔ)根據(jù)segement分段存儲(chǔ),默認(rèn)1G,其中segement包括index稀疏索引文件和log數(shù)據(jù)文件。其中index文件索引通過offset與posttion來定位數(shù)據(jù)文件中指定message的消息。其中index和log的文件名都為當(dāng)前segement的起始o(jì)ffset。

讀取offset=170418的消息,首先通過offset根據(jù)二分法定位到index索引文件,然后根據(jù)索引文件中的[offset,position](position為物理偏移地址)去log中獲取指定offset的message數(shù)據(jù)。

10、生產(chǎn)者發(fā)送消息有哪些模式

異步發(fā)送

對(duì)于生產(chǎn)者的異步發(fā)送來說就是,我發(fā)送完當(dāng)前消息后,并不需要你將當(dāng)前消息的發(fā)送結(jié)果立馬告訴我,而是可以隨即進(jìn)行下一條消息的發(fā)送。但是我會(huì)允許添加一個(gè)回調(diào)函數(shù),接收你后續(xù)返回的發(fā)送結(jié)果。異步發(fā)送這塊我們直接調(diào)用kafkaProducer的send方法即可實(shí)現(xiàn)異步發(fā)送。

同步發(fā)送

如果生產(chǎn)者需要使用同步發(fā)送的方式,只需要拿到 send 方法返回的future對(duì)象后,調(diào)用其 get() 方法即可。此時(shí)如果消息還未發(fā)送到broker中,get方法會(huì)被阻塞,等到 broker 返回消息發(fā)送結(jié)果后會(huì)跳出當(dāng)前方法并將結(jié)果返回。

11、發(fā)送消息的分區(qū)策略有哪些

所謂分區(qū)寫入策略,即是生產(chǎn)者將數(shù)據(jù)寫入到kafka主題后,kafka如何將數(shù)據(jù)分配到不同分區(qū)中的策略。

常見的有三種策略,輪詢策略,隨機(jī)策略,和按鍵保存策略。其中輪詢策略是默認(rèn)的分區(qū)策略,而隨機(jī)策略則是較老版本的分區(qū)策略,不過由于其分配的均衡性不如輪詢策略,故而后來改成了輪詢策略為默認(rèn)策略。

輪詢策略

所謂輪詢策略,即按順序輪流將每條數(shù)據(jù)分配到每個(gè)分區(qū)中。

舉個(gè)例子,假設(shè)主題test有三個(gè)分區(qū),分別是分區(qū)A,分區(qū)B和分區(qū)C。那么主題對(duì)接收到的第一條消息寫入A分區(qū),第二條消息寫入B分區(qū),第三條消息寫入C分區(qū),第四條消息則又寫入A分區(qū),依此類推。

輪詢策略是默認(rèn)的策略,故而也是使用最頻繁的策略,它能最大限度保證所有消息都平均分配到每一個(gè)分區(qū)。除非有特殊的業(yè)務(wù)需求,否則使用這種方式即可。

隨機(jī)策略

隨機(jī)策略,也就是每次都隨機(jī)地將消息分配到每個(gè)分區(qū)。其實(shí)大概就是先得出分區(qū)的數(shù)量,然后每次獲取一個(gè)隨機(jī)數(shù),用該隨機(jī)數(shù)確定消息發(fā)送到哪個(gè)分區(qū)。

在比較早的版本,默認(rèn)的分區(qū)策略就是隨機(jī)策略,但其實(shí)使用隨機(jī)策略也是為了更好得將消息均衡寫入每個(gè)分區(qū)。但后來發(fā)現(xiàn)對(duì)這一需求而言,輪詢策略的表現(xiàn)更優(yōu),所以社區(qū)后來的默認(rèn)策略就是輪詢策略了。

hash(Key)

按鍵保存策略,就是當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)的時(shí)候,可以指定一個(gè)key,計(jì)算這個(gè)key的hashCode值,按照hashCode的值對(duì)不同消息進(jìn)行存儲(chǔ)。

至于要如何實(shí)現(xiàn),那也簡(jiǎn)單,只要讓生產(chǎn)者發(fā)送的時(shí)候指定key就行。欸剛剛不是說默認(rèn)的是輪詢策略嗎?其實(shí)啊,kafka默認(rèn)是實(shí)現(xiàn)了兩個(gè)策略,沒指定key的時(shí)候就是輪詢策略,有的話那激素按鍵保存策略了。

上面有說到一個(gè)場(chǎng)景,那就是要順序發(fā)送消息到kafka。前面提到的方案是讓所有數(shù)據(jù)存儲(chǔ)到一個(gè)分區(qū)中,但其實(shí)更好的做法,就是使用這種按鍵保存策略。

讓需要順序存儲(chǔ)的數(shù)據(jù)都指定相同的鍵,而不需要順序存儲(chǔ)的數(shù)據(jù)指定不同的鍵,這樣一來,即實(shí)現(xiàn)了順序存儲(chǔ)的需求,又能夠享受到kafka多分區(qū)的優(yōu)勢(shì),豈不美哉。

粘性分區(qū)

所以如果使用默認(rèn)的輪詢partition策略,可能會(huì)造成一個(gè)大的batch被輪詢成多個(gè)小的batch的情況。鑒于此,kafka2.4的時(shí)候推出一種新的分區(qū)策略,即StickyPartitioning Strategy,StickyPartitioning Strategy會(huì)隨機(jī)地選擇另一個(gè)分區(qū)并會(huì)盡可能地堅(jiān)持使用該分區(qū)——即所謂的粘住這個(gè)分區(qū)。

鑒于小batch可能導(dǎo)致延時(shí)增加,之前對(duì)于無Key消息的分區(qū)策略效率很低。社區(qū)于2.4版本引入了黏性分區(qū)策略(StickyPartitioning Strategy)。該策略是一種全新的策略,能夠顯著地降低給消息指定分區(qū)過程中的延時(shí)。使用StickyPartitioner有助于改進(jìn)消息批處理,減少延遲,并減少broker的負(fù)載。

自定義分區(qū)器

實(shí)現(xiàn)partitioner接口

切記分區(qū)是實(shí)現(xiàn)負(fù)載均衡以及高吞吐量的關(guān)鍵,所以一定要在生產(chǎn)者這一端就要考慮好合適的分區(qū)策略,避免造成消息數(shù)據(jù)的“傾斜”,使得某些分區(qū)成為性能瓶頸,從而導(dǎo)致下游數(shù)據(jù)消費(fèi)的性能下降的問題。

12、Kafka可靠性保證(不丟消息)

Kafka精確一次性(Exactly-once)保障之一

Kafka可靠性主要從三個(gè)方面來看,Broker、Producer、Consumer。1. Brokerbroker寫數(shù)據(jù)時(shí)首先寫到PageCache中,pageCache的數(shù)據(jù)通過linux的flusher程序異步批量存儲(chǔ)至磁盤中,此過程稱為刷盤。而pageCache位于內(nèi)存。這部分?jǐn)?shù)據(jù)會(huì)在斷電后丟失。刷盤觸發(fā)條件有三:

  • 主動(dòng)調(diào)用sync或fsync函數(shù)
  • 可用內(nèi)存低于閥值
  • dirty data時(shí)間達(dá)到閥值。dirty是pagecache的一個(gè)標(biāo)識(shí)位,當(dāng)有數(shù)據(jù)寫入到pageCache時(shí),pagecache被標(biāo)注為dirty,數(shù)據(jù)刷盤以后,dirty標(biāo)志清除。

kafka沒有提供同步刷盤的方式,也就是說理論上要完全讓kafka保證單個(gè)broker不丟失消息是做不到的,只能通過調(diào)整刷盤機(jī)制的參數(shù)緩解該情況,比如:

減少刷盤間隔log.flush.interval.ms(在刷新到磁盤之前,任何topic中的消息保留在內(nèi)存中的最長(zhǎng)時(shí)間) 減少刷盤數(shù)據(jù)量大小log.flush.interval.messages(在將消息刷新到磁盤之前,在日志分區(qū)上累積的消息數(shù)量)。

時(shí)間越短,數(shù)據(jù)量越小,性能越差,但是丟失的數(shù)據(jù)會(huì)變少,可靠性越好。這是一個(gè)選擇題。

同時(shí),Kafka通過producer和broker協(xié)同處理消息丟失的情況,一旦producer發(fā)現(xiàn)broker消息丟失,即可自動(dòng)進(jìn)行retry。retry次數(shù)可根據(jù)參數(shù)retries進(jìn)行配置,超過指定次數(shù)會(huì),此條消息才會(huì)被判斷丟失。producer和broker之間,通過ack機(jī)制來判斷消息是否丟失。

  • acks=0,producer不等待broker的響應(yīng),效率最高,但是消息很可能會(huì)丟。
  • acks=1,leader broker收到消息后,不等待其他follower的響應(yīng),即返回ack。也可以理解為ack數(shù)為1。此時(shí),如果follower還沒有收到leader同步的消息leader就掛了,那么消息會(huì)丟失。按照上圖中的例子,如果leader收到消息,成功寫入PageCache后,會(huì)返回ack,此時(shí)producer認(rèn)為消息發(fā)送成功。但此時(shí),按照上圖,數(shù)據(jù)還沒有被同步到follower。如果此時(shí)leader斷電,數(shù)據(jù)會(huì)丟失。
  • acks=-1,leader broker收到消息后,掛起,等待所有ISR列表中的follower返回結(jié)果后,再返回ack。-1等效與all。這種配置下,只有l(wèi)eader寫入數(shù)據(jù)到pagecache是不會(huì)返回ack的,還需要所有的ISR返回“成功”才會(huì)觸發(fā)ack。如果此時(shí)斷電,producer可以知道消息沒有被發(fā)送成功,將會(huì)重新發(fā)送。如果在follower收到數(shù)據(jù)以后,成功返回ack,leader斷電,數(shù)據(jù)將存在于原來的follower中。在重新選舉以后,新的leader會(huì)持有該部分?jǐn)?shù)據(jù)。數(shù)據(jù)從leader同步到follower,需要2步:
  • 數(shù)據(jù)從pageCache被刷盤到disk。因?yàn)橹挥衐isk中的數(shù)據(jù)才能被同步到replica。
  • 數(shù)據(jù)同步到replica,并且replica成功將數(shù)據(jù)寫入PageCache。在producer得到ack后,哪怕是所有機(jī)器都停電,數(shù)據(jù)也至少會(huì)存在于leader的磁盤內(nèi)。
  • 上面第三點(diǎn)提到了ISR的列表的follower,需要配合另一個(gè)參數(shù)才能更好的保證ack的有效性。ISR是Broker維護(hù)的一個(gè)“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個(gè)參數(shù):min.insync.replicas。該參數(shù)表示ISR中最少的副本數(shù)。如果不設(shè)置該值,ISR中的follower列表可能為空。此時(shí)相當(dāng)于acks=1。

Topic 分區(qū)副本

在 Kafka 0.8.0 之前,Kafka 是沒有副本的概念的,那時(shí)候人們只會(huì)用 Kafka 存儲(chǔ)一些不重要的數(shù)據(jù),因?yàn)闆]有副本,數(shù)據(jù)很可能會(huì)丟失。但是隨著業(yè)務(wù)的發(fā)展,支持副本的功能越來越強(qiáng)烈,所以為了保證數(shù)據(jù)的可靠性,Kafka 從 0.8.0 版本開始引入了分區(qū)副本(詳情請(qǐng)參見 KAFKA-50)。也就是說每個(gè)分區(qū)可以人為的配置幾個(gè)副本(比如創(chuàng)建主題的時(shí)候指定 replication-factor,也可以在 Broker 級(jí)別進(jìn)行配置 default.replication.factor),一般會(huì)設(shè)置為3。

Kafka 可以保證單個(gè)分區(qū)里的事件是有序的,分區(qū)可以在線(可用),也可以離線(不可用)。在眾多的分區(qū)副本里面有一個(gè)副本是 Leader,其余的副本是 follower,所有的讀寫操作都是經(jīng)過 Leader 進(jìn)行的,同時(shí) follower 會(huì)定期地去 leader 上的復(fù)制數(shù)據(jù)。當(dāng) Leader 掛了的時(shí)候,其中一個(gè) follower 會(huì)重新成為新的 Leader。通過分區(qū)副本,引入了數(shù)據(jù)冗余,同時(shí)也提供了 Kafka 的數(shù)據(jù)可靠性。

Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個(gè)副本可以使 Kafka 在發(fā)生崩潰時(shí)仍能保證消息的持久性。

2. Producer

producer在發(fā)送數(shù)據(jù)時(shí)可以將多個(gè)請(qǐng)求進(jìn)行合并后異步發(fā)送,合并后的請(qǐng)求首先緩存在本地buffer中,正常情況下,producer客戶端的異步調(diào)用可以通過callback回調(diào)函數(shù)來處理消息發(fā)送失敗或者超時(shí)的情況,但是當(dāng)出現(xiàn)以下情況,將會(huì)出現(xiàn)數(shù)據(jù)丟失

  1. producer異常中斷,buffer中的數(shù)據(jù)將丟失。
  2. producer客戶端內(nèi)存不足,如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會(huì)丟失。
  3. 消息產(chǎn)生(異步)過快,導(dǎo)致掛起線程過多,內(nèi)存不足,導(dǎo)致程序崩潰,消息丟失。

針對(duì)以上情況,可以有以下解決思路。

  1. producer采用同步方式發(fā)送消息,或者生產(chǎn)數(shù)據(jù)時(shí)采用阻塞的線程池,并且線程數(shù)不宜過多。整體思路就是控制消息產(chǎn)生速度。
  2. 擴(kuò)大buffer的容量配置,配置項(xiàng)為:buffer.memory。這種方法可以緩解數(shù)據(jù)丟失的情況,但不能杜絕。

3.Consumer

Consumer消費(fèi)消息有以下幾個(gè)步驟:

  • 接收消息
  • 處理消息
  • 反饋處理結(jié)果

消費(fèi)方式主要分為兩種

  • 自動(dòng)提交offset,Automatic Offset Committing (enable.auto.commit=true)
  • 手動(dòng)提交offset,Manual Offset Control(enable.auto.commit=false)

Consumer自動(dòng)提交機(jī)制是根據(jù)一定的時(shí)間間隔,將收到的消息進(jìn)行commit,具體配置為:auto.commit.interval.ms。commit和消費(fèi)的過程是異步的,也就是說可能存在消費(fèi)過程未成功,commit消息就已經(jīng)提交,此時(shí)就會(huì)出現(xiàn)消息丟失。我們可將提交類型改為手動(dòng)提交,在消費(fèi)完成后再進(jìn)行提交,這樣可以保證消息“至少被消費(fèi)一次”(at least once),但如果消費(fèi)完成后在提交過程中出現(xiàn)故障,則會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,本章不討論,下章講解。

13、Kafka 是怎么去實(shí)現(xiàn)負(fù)載均衡的

生產(chǎn)者層面

分區(qū)器是生產(chǎn)者層面的負(fù)載均衡。Kafka 生產(chǎn)者生產(chǎn)消息時(shí),根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負(fù)載均衡很大程度上依賴于分區(qū)器。Kafka 默認(rèn)的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進(jìn)行分區(qū)分配的:

如果 key 不為 null:對(duì) Key 值進(jìn)行 Hash 計(jì)算,從所有分區(qū)中根據(jù) Key 的 Hash 值計(jì)算出一個(gè)分區(qū)號(hào);擁有相同 Key 值的消息被寫入同一個(gè)分區(qū);如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認(rèn)的分區(qū)器,用戶可以實(shí)現(xiàn) Partitioner 接口,自行實(shí)現(xiàn)分區(qū)方法。

注:在筆者的理解中,分區(qū)器的負(fù)載均衡與順序性有著一定程度上的矛盾。

  • 負(fù)載均衡的目的是將消息盡可能平均分配,對(duì)于 Kafka 而言,就是盡可能將消息平均分配給所有分區(qū);
  • 如果使用 Kafka 保證順序性,則需要利用到 Kafka 的分區(qū)順序性的特性。
  • 對(duì)于需要保證順序性的場(chǎng)景,通常會(huì)利用 Key 值實(shí)現(xiàn)分區(qū)順序性,那么所有 Key值相同的消息就會(huì)進(jìn)入同一個(gè)分區(qū)。這樣的情況下,對(duì)于大量擁有相同 Key值的消息,會(huì)涌入同一個(gè)分區(qū),導(dǎo)致一個(gè)分區(qū)消息過多,其他分區(qū)沒有消息的情況,即與負(fù)載均衡的思想相悖。

消費(fèi)者層面

主要根據(jù)消費(fèi)者的Rebalance機(jī)制實(shí)現(xiàn),內(nèi)容詳見下章

14、簡(jiǎn)述Kafka的Rebalance機(jī)制

什么是 Rebalance

Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè) Consumer Group 下的所有 consumer 如何達(dá)成一致,來分配訂閱 Topic 的每個(gè)分區(qū)。 例如:某 Group 下有 20 個(gè) consumer 實(shí)例,它訂閱了一個(gè)具有 100 個(gè) partition 的 Topic。正常情況下,kafka 會(huì)為每個(gè) Consumer 平均的分配 5 個(gè)分區(qū)。這個(gè)分配的過程就是 Rebalance。

觸發(fā) Rebalance 的時(shí)機(jī)

Rebalance 的觸發(fā)條件有3個(gè)。

  1. 組成員個(gè)數(shù)發(fā)生變化。例如有新的 consumer 實(shí)例加入該消費(fèi)組或者離開組。
  2. 訂閱的 Topic 個(gè)數(shù)發(fā)生變化。
  3. 訂閱 Topic 的分區(qū)數(shù)發(fā)生變化。

Rebalance 發(fā)生時(shí),Group 下所有 consumer 實(shí)例都會(huì)協(xié)調(diào)在一起共同參與,kafka 能夠保證盡量達(dá)到最公平的分配。但是 Rebalance 過程對(duì) consumer group 會(huì)造成比較嚴(yán)重的影響。在 Rebalance 的過程中 consumer group 下的所有消費(fèi)者實(shí)例都會(huì)停止工作,等待 Rebalance 過程完成。

Rebalance 過程

Rebalance 過程分為兩步:JoinGroup 請(qǐng)求和 SyncGroup 請(qǐng)求。JoinGroup :JoinGroup 請(qǐng)求的主要作用是將組成員訂閱信息發(fā)送給領(lǐng)導(dǎo)者消費(fèi)者,待領(lǐng)導(dǎo)者制定好分配方案后,重平衡流程進(jìn)入到 SyncGroup 請(qǐng)求階段。SyncGroup:SyncGroup 請(qǐng)求的主要目的,就是讓協(xié)調(diào)者把領(lǐng)導(dǎo)者制定的分配方案下發(fā)給各個(gè)組內(nèi)成員。當(dāng)所有成員都成功接收到分配方案后,消費(fèi)者組進(jìn)入到 Stable 狀態(tài),即開始正常的消費(fèi)工作。

15、Kafka 負(fù)載均衡會(huì)導(dǎo)致什么問題

在消費(fèi)者組Rebalance期間,一直等到rebalance結(jié)束前,消費(fèi)者會(huì)出現(xiàn)無法讀取消息,造成整個(gè)消費(fèi)者組一段時(shí)間內(nèi)不可用。

16、如何增強(qiáng)消費(fèi)者的消費(fèi)能力

1、如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時(shí)提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)==分區(qū)數(shù)。兩者缺一不可。

2、如果是下游的數(shù)據(jù)處理不及時(shí):則提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時(shí)間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會(huì)造成數(shù)據(jù)積壓。

3、優(yōu)化消費(fèi)者的處理邏輯,提高處理效率

17、消費(fèi)者與Topic的分區(qū)策略

Range

Range是對(duì)每個(gè)Topic而言的(即一個(gè)Topic一個(gè)Topic分),首先對(duì)同一個(gè)Topic里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序。然后用Partitions分區(qū)的個(gè)數(shù)除以消費(fèi)者線程的總數(shù)來決定每個(gè)消費(fèi)者線程消費(fèi)幾個(gè)分區(qū)。如果除不盡,那么前面幾個(gè)消費(fèi)者線程將會(huì)多消費(fèi)一個(gè)分區(qū)。

RoundRobin

將消費(fèi)組內(nèi)所有消費(fèi)者以及消費(fèi)者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢方式逐個(gè)將分區(qū)以此分配給每個(gè)消費(fèi)者。使用RoundRobin策略有兩個(gè)前提條件必須滿足:

  • 同一個(gè)消費(fèi)者組里面的所有消費(fèi)者的num.streams(消費(fèi)者消費(fèi)線程數(shù))必須相等;
  • 每個(gè)消費(fèi)者訂閱的主題必須相同。

StickyAssignor

無論是RangeAssignor,還是RoundRobinAssignor,當(dāng)前的分區(qū)分配算法都沒有考慮上一次的分配結(jié)果。顯然,在執(zhí)行一次新的分配之前,如果能考慮到上一次分配的結(jié)果,盡量少的調(diào)整分區(qū)分配的變動(dòng),顯然是能節(jié)省很多開銷的。

Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對(duì)上一次分配做最少的變動(dòng)(上一次的結(jié)果是有粘性的),其目標(biāo)有兩點(diǎn):

  1. 分區(qū)的分配盡量的均衡
  2. 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致

StickyAssignor的模式比其他兩種提供更加均衡的分配結(jié)果,在發(fā)生Consumer或者Partition變更的情況下,也能減少不必要的分區(qū)調(diào)整。

18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)

Kafka精確一次性(Exactly-once)保障之一

冪等性:就是用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。

出現(xiàn)原因:

  • 原因1:Consumer在消費(fèi)過程中,被強(qiáng)行kill掉消費(fèi)者線程或異常中斷(消費(fèi)系統(tǒng)宕機(jī)、重啟等),導(dǎo)致實(shí)際消費(fèi)后的數(shù)據(jù),offset沒有提交。
  • 原因2:設(shè)置offset為自動(dòng)提交,關(guān)閉kafka時(shí),如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會(huì)重復(fù)消費(fèi)。
  • 原因3:消費(fèi)超時(shí)導(dǎo)致消費(fèi)者與集群斷開連接,offset尚未提交,導(dǎo)致重平衡后重復(fù)消費(fèi)。一般消費(fèi)超時(shí)(session.time.out)有以下原因:并發(fā)過大,消費(fèi)者突然宕機(jī),處理超時(shí)等。

解決思路:

  1. 提高消費(fèi)能力,提高單條消息的處理速度,例如對(duì)消息處理中比 較耗時(shí)的步驟可通過異步的方式進(jìn)行處理、利用多線程處理等。
  2. 在縮短單條消息消費(fèi)時(shí)常的同時(shí),根據(jù)實(shí)際場(chǎng)景可將session.time.out(Consumer心跳超時(shí)時(shí)間)和max.poll.interval.ms(consumer兩次poll的最大時(shí)間間隔)值設(shè)置大一點(diǎn),避免不必要的rebalance,此外可適當(dāng)減小max.poll.records的值( 表示每次消費(fèi)的時(shí)候,獲取多少條消息),默認(rèn)值是500,可根據(jù)實(shí)際消息速率適當(dāng)調(diào)小。這種思路可解決因消費(fèi)時(shí)間過長(zhǎng)導(dǎo)致的重復(fù)消費(fèi)問題, 對(duì)代碼改動(dòng)較小,但無法絕對(duì)避免重復(fù)消費(fèi)問題。
  3. 根據(jù)業(yè)務(wù)情況制定:引入單獨(dú)去重機(jī)制,例如生成消息時(shí),在消息中加入唯一標(biāo)識(shí)符如主鍵id。寫入時(shí)根據(jù)逐漸主鍵判斷update還是insert。如果寫redis,則每次根據(jù)主鍵id進(jìn)行set即可,天然冪等性。或者使用redis作為緩沖,將id首先寫入redis進(jìn)行重復(fù)判斷,然后在進(jìn)行后續(xù)操作。
  4. 開啟生產(chǎn)者的精確一次性,也就是冪等性, 再引入producer事務(wù) ,即客戶端傳入一個(gè)全局唯一的Transaction ID,這樣即使本次會(huì)話掛掉也能根據(jù)這個(gè)id找到原來的事務(wù)狀態(tài)

19、為什么Kafka不支持讀寫分離

在 Kafka 中,生產(chǎn)者寫入消息、消費(fèi)者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實(shí)現(xiàn)的是一種主寫主讀的生產(chǎn)消費(fèi)模型。

Kafka 并不支持主寫從讀,因?yàn)橹鲗憦淖x有 2 個(gè)很明 顯的缺點(diǎn):

  1. 數(shù)據(jù)一致性問題。數(shù)據(jù)從主節(jié)點(diǎn)轉(zhuǎn)到從節(jié)點(diǎn)必然會(huì)有一個(gè)延時(shí)的時(shí)間窗口,這個(gè)時(shí)間 窗口會(huì)導(dǎo)致主從節(jié)點(diǎn)之間的數(shù)據(jù)不一致。某一時(shí)刻,在主節(jié)點(diǎn)和從節(jié)點(diǎn)中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點(diǎn)中 A 的值修改為 Y,那么在這個(gè)變更通知到從節(jié)點(diǎn)之前,應(yīng)用讀取從節(jié)點(diǎn)中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。
  2. 延時(shí)問題。類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點(diǎn)到同步至從節(jié)點(diǎn)中的過程需要經(jīng) 歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→網(wǎng)絡(luò)→從節(jié)點(diǎn)內(nèi)存這幾個(gè)階段,整個(gè)過程會(huì)耗費(fèi)一定的時(shí)間。而在 Kafka 中,主從同步會(huì)比 Redis 更加耗時(shí),它需要經(jīng)歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→主節(jié)點(diǎn)磁盤→網(wǎng)絡(luò)→從節(jié) 點(diǎn)內(nèi)存→從節(jié)點(diǎn)磁盤這幾個(gè)階段。對(duì)延時(shí)敏感的應(yīng)用而言,主寫從讀的功能并不太適用。

20、Kafka選舉機(jī)制

Kafka選舉主要分為以下三種:

  1. 控制器(Broker)選舉機(jī)制
  2. 分區(qū)副本選舉機(jī)制
  3. 消費(fèi)組選舉機(jī)制

控制器選舉

控制器是Kafka的核心組件,它的主要作用是在Zookeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群包括所有分區(qū)與副本的狀態(tài)。集群中任意一個(gè)Broker都能充當(dāng)控制器的角色,但在運(yùn)行過程中,只能有一個(gè)Broker成為控制器。集群中第一個(gè)啟動(dòng)的Broker會(huì)通過在Zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn)/controller來讓自己成為控制器,其他Broker啟動(dòng)時(shí)也會(huì)在zookeeper中創(chuàng)建臨時(shí)節(jié)點(diǎn),但是發(fā)現(xiàn)節(jié)點(diǎn)已經(jīng)存在,所以它們會(huì)收到一個(gè)異常,意識(shí)到控制器已經(jīng)存在,那么就會(huì)在Zookeeper中創(chuàng)建watch對(duì)象,便于它們收到控制器變更的通知。如果控制器與Zookeeper斷開連接或異常退出,其他broker通過watch收到控制器變更的通知,就會(huì)嘗試創(chuàng)建臨時(shí)節(jié)點(diǎn)/controller,如果有一個(gè)Broker創(chuàng)建成功,那么其他broker就會(huì)收到創(chuàng)建異常通知,代表控制器已經(jīng)選舉成功,其他Broker只需創(chuàng)建watch對(duì)象即可。

控制器作用

  1. 主題管理:創(chuàng)建、刪除Topic,以及增加Topic分區(qū)等操作都是由控制器執(zhí)行。
  2. 分區(qū)重分配:執(zhí)行Kafka的reassign腳本對(duì)Topic分區(qū)重分配的操作,也是由控制器實(shí)現(xiàn)。如果集群中有一個(gè)Broker異常退出,控制器會(huì)檢查這個(gè)broker是否有分區(qū)的副本leader,如果有那么這個(gè)分區(qū)就需要一個(gè)新的leader,此時(shí)控制器就會(huì)去遍歷其他副本,決定哪一個(gè)成為新的leader,同時(shí)更新分區(qū)的ISR集合。如果有一個(gè)Broker加入集群中,那么控制器就會(huì)通過Broker ID去判斷新加入的Broker中是否含有現(xiàn)有分區(qū)的副本,如果有,就會(huì)從分區(qū)副本中去同步數(shù)據(jù)。
  3. Preferred leader選舉:因?yàn)樵贙afka集群長(zhǎng)時(shí)間運(yùn)行中,broker的宕機(jī)或崩潰是不可避免的,leader就會(huì)發(fā)生轉(zhuǎn)移,即使broker重新回來,也不會(huì)是leader了。在眾多l(xiāng)eader的轉(zhuǎn)移過程中,就會(huì)產(chǎn)生leader不均衡現(xiàn)象,可能一小部分broker上有大量的leader,影響了整個(gè)集群的性能,所以就需要把leader調(diào)整回最初的broker上,這就需要Preferred leader選舉。
  4. 集群成員管理:控制器能夠監(jiān)控新broker的增加,broker的主動(dòng)關(guān)閉與被動(dòng)宕機(jī),進(jìn)而做其他工作。這也是利用Zookeeper的ZNode模型和Watcher機(jī)制,控制器會(huì)監(jiān)聽Zookeeper中/brokers/ids下臨時(shí)節(jié)點(diǎn)的變化。同時(shí)對(duì)broker中的leader節(jié)點(diǎn)進(jìn)行調(diào)整。
  5. 元數(shù)據(jù)服務(wù):控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有broker會(huì)定期接收控制器發(fā)來的元數(shù)據(jù)更新請(qǐng)求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。

分區(qū)副本選舉機(jī)制

發(fā)生副本選舉的情況:

  1. 創(chuàng)建主題
  2. 增加分區(qū)
  3. 分區(qū)下線(分區(qū)中原先的leader副本下線,此時(shí)分區(qū)需要選舉一個(gè)新的leader上線來對(duì)外提供服務(wù))
  4. 分區(qū)重分配

分區(qū)leader副本的選舉由Kafka控制器負(fù)責(zé)具體實(shí)施。主要過程如下:

  1. 從Zookeeper中讀取當(dāng)前分區(qū)的所有ISR(in-sync replicas)集合。
  2. 調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的leader。

分區(qū)副本分為ISR(同步副本)和OSR(非同步副本),當(dāng)leader發(fā)生故障時(shí),只有“同步副本”才可以被選舉為leader。選舉時(shí)按照集合中副本的順序查找第一個(gè)存活的副本,并且這個(gè)副本在ISR集合中。同時(shí)kafka支持OSR(非同步副本)也參加選舉,Kafka broker端提供了一個(gè)參數(shù)unclean.leader.election.enable,用于控制是否允許非同步副本參與leader選舉;如果開啟,則當(dāng) ISR為空時(shí)就會(huì)從這些副本中選舉新的leader,這個(gè)過程稱為 Unclean leader選舉??梢愿鶕?jù)實(shí)際的業(yè)務(wù)場(chǎng)景選擇是否開啟Unclean leader選舉。開啟 Unclean 領(lǐng)導(dǎo)者選舉可能會(huì)造成數(shù)據(jù)丟失,但好處是,它使得分區(qū) Leader 副本一直存在,不至于停止對(duì)外提供服務(wù),因此提升了高可用性。一般建議是關(guān)閉Unclean leader選舉,因?yàn)橥ǔ?shù)據(jù)的一致性要比可用性重要。

消費(fèi)組(Consumer Group)選主

在Kafka的消費(fèi)端,會(huì)有一個(gè)消費(fèi)者協(xié)調(diào)器以及消費(fèi)組,組協(xié)調(diào)器(Group Coordinator)需要為消費(fèi)組內(nèi)的消費(fèi)者選舉出一個(gè)消費(fèi)組的leader。如果消費(fèi)組內(nèi)還沒有l(wèi)eader,那么第一個(gè)加入消費(fèi)組的消費(fèi)者即為消費(fèi)組的leader,如果某一個(gè)時(shí)刻leader消費(fèi)者由于某些原因退出了消費(fèi)組,那么就會(huì)重新選舉leader,選舉源碼如下:

private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption

在組協(xié)調(diào)器中消費(fèi)者的信息是以HashMap的形式存儲(chǔ)的,其中key為消費(fèi)者的member_id,而value是消費(fèi)者相關(guān)的元數(shù)據(jù)信息。而leader的取值為HashMap中的第一個(gè)鍵值對(duì)的key(這種選舉方式等同于隨機(jī))。

消費(fèi)組的Leader和Coordinator沒有關(guān)聯(lián)。消費(fèi)組的leader負(fù)責(zé)Rebalance過程中消費(fèi)分配方案的制定。

21、腦裂問題

controller掛掉后,Kafka集群會(huì)重新選舉一個(gè)新的controller。這里面存在一個(gè)問題,很難確定之前的controller節(jié)點(diǎn)是掛掉還是只是短暫性的故障。如果之前掛掉的controller又正常了,他并不知道自己已經(jīng)被取代了,那么此時(shí)集群中會(huì)出現(xiàn)兩臺(tái)controller。

其實(shí)這種情況是很容易發(fā)生。比如,某個(gè)controller由于GC而被認(rèn)為已經(jīng)掛掉,并選擇了一個(gè)新的controller。在GC的情況下,在最初的controller眼中,并沒有改變?nèi)魏螙|西,該Broker甚至不知道它已經(jīng)暫停了。因此,它將繼續(xù)充當(dāng)當(dāng)前controller,這是分布式系統(tǒng)中的常見情況,稱為腦裂。

假如,處于活躍狀態(tài)的controller進(jìn)入了長(zhǎng)時(shí)間的GC暫停。它的ZooKeeper會(huì)話過期了,之前注冊(cè)的/controller節(jié)點(diǎn)被刪除。集群中其他Broker會(huì)收到zookeeper的這一通知。

由于集群中必須存在一個(gè)controller Broker,所以現(xiàn)在每個(gè)Broker都試圖嘗試成為新的controller。假設(shè)Broker 2速度比較快,成為了最新的controller Broker。此時(shí),每個(gè)Broker會(huì)收到Broker2成為新的controller的通知,由于Broker3正在進(jìn)行"stop the world"的GC,可能不會(huì)收到Broker2成為最新的controller的通知。

等到Broker3的GC完成之后,仍會(huì)認(rèn)為自己是集群的controller,在Broker3的眼中好像什么都沒有發(fā)生一樣。

現(xiàn)在,集群中出現(xiàn)了兩個(gè)controller,它們可能一起發(fā)出具有沖突的命令,就會(huì)出現(xiàn)腦裂的現(xiàn)象。如果對(duì)這種情況不加以處理,可能會(huì)導(dǎo)致嚴(yán)重的不一致。所以需要一種方法來區(qū)分誰是集群當(dāng)前最新的Controller。

Kafka是通過使用epoch number(紀(jì)元編號(hào),也稱為隔離令牌)來完成的。epoch number只是單調(diào)遞增的數(shù)字,第一次選出Controller時(shí),epoch number值為1,如果再次選出新的Controller,則epoch number將為2,依次單調(diào)遞增。

每個(gè)新選出的controller通過Zookeeper 的條件遞增操作獲得一個(gè)全新的、數(shù)值更大的epoch number 。其他Broker 在知道當(dāng)前epoch number 后,如果收到由controller發(fā)出的包含較舊(較小)epoch number的消息,就會(huì)忽略它們,即Broker根據(jù)最大的epoch number來區(qū)分當(dāng)前最新的controller。

上圖,Broker3向Broker1發(fā)出命令:讓Broker1上的某個(gè)分區(qū)副本成為leader,該消息的epoch number值為1。于此同時(shí),Broker2也向Broker1發(fā)送了相同的命令,不同的是,該消息的epoch number值為2,此時(shí)Broker1只聽從Broker2的命令(由于其epoch number較大),會(huì)忽略Broker3的命令,從而避免腦裂的發(fā)生。

22、如何為Kafka集群選擇合適的

Topics/Partitions數(shù)量

1、根據(jù)當(dāng)前topic的消費(fèi)者數(shù)量確認(rèn)

在kafka中,單個(gè)patition是kafka并行操作的最小單元。在producer和broker端,向每一個(gè)分區(qū)寫入數(shù)據(jù)是可以完全并行化的,此時(shí),可以通過加大硬件資源的利用率來提升系統(tǒng)的吞吐量,例如對(duì)數(shù)據(jù)進(jìn)行壓縮。在consumer段,kafka只允許單個(gè)partition的數(shù)據(jù)被一個(gè)consumer線程消費(fèi)。因此,在consumer端,每一個(gè)Consumer Group內(nèi)部的consumer并行度完全依賴于被消費(fèi)的分區(qū)數(shù)量。綜上所述,通常情況下,在一個(gè)Kafka集群中,partition的數(shù)量越多,意味著可以到達(dá)的吞吐量越大。

2、根據(jù)consumer端的最大吞吐量確定

我們可以粗略地通過吞吐量來計(jì)算kafka集群的分區(qū)數(shù)量。假設(shè)對(duì)于單個(gè)partition,producer端的可達(dá)吞吐量為p,Consumer端的可達(dá)吞吐量為c,期望的目標(biāo)吞吐量為t,那么集群所需要的partition數(shù)量至少為max(t/p,t/c)。在producer端,單個(gè)分區(qū)的吞吐量大小會(huì)受到批量大小、數(shù)據(jù)壓縮方法、 確認(rèn)類型(同步/異步)、復(fù)制因子等配置參數(shù)的影響。經(jīng)過測(cè)試,在producer端,單個(gè)partition的吞吐量通常是在10MB/s左右。在consumer端,單個(gè)partition的吞吐量依賴于consumer端每個(gè)消息的應(yīng)用邏輯處理速度。因此,我們需要對(duì)consumer端的吞吐量進(jìn)行測(cè)量。

23、Kafka 分區(qū)數(shù)可以增加或減少嗎,為什么

kafka支持分區(qū)數(shù)增加

例如我們可以使用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令將原本分區(qū)數(shù)為1得topic-name設(shè)置為3。當(dāng)主題中的消息包含有key時(shí)(即key不為null),根據(jù)key來計(jì)算分區(qū)的行為就會(huì)有所影響。當(dāng)topic-config的分區(qū)數(shù)為1時(shí),不管消息的key為何值,消息都會(huì)發(fā)往這一個(gè)分區(qū)中;當(dāng)分區(qū)數(shù)增加到3時(shí),那么就會(huì)根據(jù)消息的key來計(jì)算分區(qū)號(hào),原本發(fā)往分區(qū)0的消息現(xiàn)在有可能會(huì)發(fā)往分區(qū)1或者分區(qū)2中。如此還會(huì)影響既定消息的順序,所以在增加分區(qū)數(shù)時(shí)一定要三思而后行。對(duì)于基于key計(jì)算的主題而言,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對(duì)其進(jìn)行調(diào)整。

Kafka 不支持減少分區(qū)數(shù)。

按照Kafka現(xiàn)有的代碼邏輯而言,此功能完全可以實(shí)現(xiàn),不過也會(huì)使得代碼的復(fù)雜度急劇增大。實(shí)現(xiàn)此功能需要考慮的因素很多,比如刪除掉的分區(qū)中的消息該作何處理?如果隨著分區(qū)一起消失則消息的可靠性得不到保障;如果需要保留則又需要考慮如何保留。直接存儲(chǔ)到現(xiàn)有分區(qū)的尾部,消息的時(shí)間戳就不會(huì)遞增,如此對(duì)于Spark、Flink這類需要消息時(shí)間戳(事件時(shí)間)的組件將會(huì)受到影響;如果分散插入到現(xiàn)有的分區(qū)中,那么在消息量很大的時(shí)候,內(nèi)部的數(shù)據(jù)復(fù)制會(huì)占用很大的資源,而且在復(fù)制期間,此主題的可用性又如何得到保障?與此同時(shí),順序性問題、事務(wù)性問題、以及分區(qū)和副本的狀態(tài)機(jī)切換問題都是不得不面對(duì)的。反觀這個(gè)功能的收益點(diǎn)卻是很低,如果真的需要實(shí)現(xiàn)此類的功能,完全可以重新創(chuàng)建一個(gè)分區(qū)數(shù)較小的主題,然后將現(xiàn)有主題中的消息按照既定的邏輯復(fù)制過去即可。

24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解

Kafka精確一次性(Exactly-once)保障之一

生產(chǎn)者冪等性主要避免生產(chǎn)者數(shù)據(jù)重復(fù)提交至Kafka broker中并落盤。在正常情況下,Producer向Broker發(fā)送消息,Broker將消息追加寫到對(duì)應(yīng)的流(即某一Topic的某一Partition)中并落盤,并向Producer返回ACK信號(hào),表示確認(rèn)收到。但是Producer和Broker之間的通信總有可能出現(xiàn)異常,如果消息已經(jīng)寫入,但ACK在半途丟失了,Producer就會(huì)進(jìn)行retry操作再次發(fā)送該消息,造成重復(fù)寫入。

為了實(shí)現(xiàn)Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。

  • PID。每個(gè)新的Producer在初始化的時(shí)候會(huì)被分配一個(gè)唯一的PID,這個(gè)PID對(duì)用戶是不可見的。
  • Sequence Numbler。對(duì)于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)都對(duì)應(yīng)一個(gè)從0開始單調(diào)遞增的Sequence Number
  • Broker端在緩存中保存了這seq number,對(duì)于接收的每條消息,如果其序號(hào)比Broker緩存中序號(hào)大于1則接受它,否則將其丟棄,這樣就可以實(shí)現(xiàn)了消息重復(fù)提交了.但是只能保證單個(gè)Producer對(duì)于同一個(gè)的Exactly Once語義

Producer使用冪等性的示例非常簡(jiǎn)單,與正常情況下Producer使用相比變化不大,只需要 把Producer的配置enable.idempotence設(shè)置為true即可,如下所示:

Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//當(dāng)enable.idempotence為true時(shí)acks默認(rèn)為 all
// props.put("acks", "all");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");

Prodcuer 冪等性對(duì)外保留的接口非常簡(jiǎn)單,其底層的實(shí)現(xiàn)對(duì)上層應(yīng)用做了很好的封裝,應(yīng)用層并不需要去關(guān)心具體的實(shí)現(xiàn)細(xì)節(jié),對(duì)用戶非常友好

Kafka的冪等性實(shí)現(xiàn)了對(duì)于單個(gè)Producer會(huì)話、單個(gè)TopicPartition級(jí)別的不重不漏,也就是最細(xì)粒度的保證。如果Producer重啟(PID發(fā)生變化),或者寫入是跨Topic、跨Partition的,單純的冪等性就會(huì)失效,需要更高級(jí)別的事務(wù)性來解決了。當(dāng)然事務(wù)性的原理更加復(fù)雜

25、談?wù)勀銓?duì) Kafka事務(wù)的了解

冪等性可以保證單個(gè)Producer會(huì)話、單個(gè)TopicPartition、單個(gè)會(huì)話session的不重不漏,如果Producer重啟,或者是寫入跨Topic、跨Partition的消息,冪等性無法保證。此時(shí)需要用到Kafka事務(wù)。Kafka 的事務(wù)處理,主要是允許應(yīng)用可以把消費(fèi)和生產(chǎn)的 batch 處理(涉及多個(gè) Partition)在一個(gè)原子單元內(nèi)完成,操作要么全部完成、要么全部失敗。為了實(shí)現(xiàn)這種機(jī)制,我們需要應(yīng)用能提供一個(gè)唯一 id,即使故障恢復(fù)后也不會(huì)改變,這個(gè) id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟內(nèi)部的 PID 1:1 分配,它們不同的是 txn.id 是用戶提供的,而 PID 是 Producer 內(nèi)部自動(dòng)生成的(并且故障恢復(fù)后這個(gè) PID 會(huì)變化),有了 txn.id 這個(gè)機(jī)制,就可以實(shí)現(xiàn)多 partition、跨會(huì)話的 EOS 語義。當(dāng)用戶使用 Kafka 的事務(wù)性時(shí),Kafka 可以做到的保證:

  1. 跨會(huì)話的冪等性寫入:即使中間故障,恢復(fù)后依然可以保持冪等性;
  2. 跨會(huì)話的事務(wù)恢復(fù):如果一個(gè)應(yīng)用實(shí)例掛了,啟動(dòng)的下一個(gè)實(shí)例依然可以保證上一個(gè)事務(wù)完成(commit 或者 abort);
  3. 跨多個(gè) Topic-Partition 的冪等性寫入,Kafka 可以保證跨多個(gè) Topic-Partition 的數(shù)據(jù)要么全部寫入成功,要么全部失敗,不會(huì)出現(xiàn)中間狀態(tài)。

事務(wù)性示例

Kafka 事務(wù)性的使用方法也非常簡(jiǎn)單,用戶只需要在 Producer 的配置中配置 transactional.id,通過 initTransactions() 初始化事務(wù)狀態(tài)信息,再通過 beginTransaction() 標(biāo)識(shí)一個(gè)事務(wù)的開始,然后通過 commitTransaction() 或 abortTransaction() 對(duì)事務(wù)進(jìn)行 commit 或 abort,示例如下所示:生產(chǎn)者:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {String msg = "matt test";producer.beginTransaction();producer.send(new ProducerRecord(topic, "0", msg.toString()));producer.send(new ProducerRecord(topic, "1", msg.toString()));producer.send(new ProducerRecord(topic, "2", msg.toString()));producer.commitTransaction();
} catch (ProducerFencedException e1) {e1.printStackTrace();producer.close();
} catch (KafkaException e2) {e2.printStackTrace();producer.abortTransaction();
}
producer.close();

消費(fèi)者:消費(fèi)者應(yīng)該設(shè)置提交事務(wù)的隔離級(jí)別

properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

Kafka中只有兩種事務(wù)隔離級(jí)別:readcommitted、readuncommitted 設(shè)置為readcommitted時(shí)候是生產(chǎn)者事務(wù)已提交的數(shù)據(jù)才能讀取到。在執(zhí)行 commitTransaction() 或 abortTransaction() 方法前,設(shè)置為“readcommitted”的消費(fèi)端應(yīng)用是消費(fèi)不到這些消息的,不過在 KafkaConsumer 內(nèi)部會(huì)緩存這些消息,直到生產(chǎn)者執(zhí)行 commitTransaction() 方法之后它才能將這些消息推送給消費(fèi)端應(yīng)用。同時(shí)KafkaConsumer會(huì)根據(jù)分區(qū)對(duì)數(shù)據(jù)進(jìn)行整合,推送時(shí)按照分區(qū)順序進(jìn)行推送。而不是按照數(shù)據(jù)發(fā)送順序。反之,如果生產(chǎn)者執(zhí)行了 abortTransaction() 方法,那么 KafkaConsumer 會(huì)將這些緩存的消息丟棄而不推送給消費(fèi)端應(yīng)用。設(shè)置為read_uncommitted時(shí)候可以讀取到未提交的數(shù)據(jù)(報(bào)錯(cuò)終止前的數(shù)據(jù))

26、Kafka消息是采用Pull模式,還是Push模式

push模式下,消費(fèi)者速率主要由生產(chǎn)者決定,當(dāng)消息生產(chǎn)速率遠(yuǎn)大于消費(fèi)速率,消費(fèi)者容易崩潰,如果為了避免consumer崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費(fèi)。Pull模式可以根據(jù)自己的消費(fèi)能力拉取數(shù)據(jù)。Push模式必須在不知道下游consumer消費(fèi)能力和消費(fèi)策略的情況下決定是立即推送每條消息還是緩存之后批量推送。Pull有個(gè)缺點(diǎn)是,如果broker沒有可供消費(fèi)的消息,將導(dǎo)致consumer不斷輪詢。但是可以在消費(fèi)者設(shè)置輪詢間隔。

27、Kafka缺點(diǎn)

  1. 由于是批量發(fā)送,數(shù)據(jù)并非真正的實(shí)時(shí);
  2. 對(duì)于mqtt協(xié)議不支持;
  3. 不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入;
  4. 僅支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序;
  5. 監(jiān)控不完善,需要安裝插件;
  6. 依賴zookeeper進(jìn)行元數(shù)據(jù)管理;3.0版本去除

28、Kafka什么時(shí)候會(huì)丟數(shù)據(jù)

broker端消費(fèi)丟失

broker端的消息不丟失,其實(shí)就是用partition副本機(jī)制來保證。

  1. unclean.leader.election為true,且選舉出的首領(lǐng)分區(qū)為OSR時(shí) 可能就會(huì)發(fā)生消息丟失
  2. min.insync.replicas為N,則至少要存在N個(gè)同步副本才能向分區(qū)寫入數(shù)據(jù)。如果同步副本數(shù)量小于N時(shí)broker就會(huì)停止接收所有生產(chǎn)者的消息、生產(chǎn)者會(huì)出現(xiàn)異常,如果無法正確處理異常,則消息丟失。此時(shí)消費(fèi)者仍然可以讀取已有數(shù)據(jù)、變成只讀狀態(tài)。如果Topic只有一個(gè)同步副本,那么在這個(gè)副本變?yōu)椴豢捎脮r(shí),數(shù)據(jù)就可能會(huì)丟失。
  3. kafka的數(shù)據(jù)一開始是存儲(chǔ)在PageCache并定期flush到磁盤上的,如果出現(xiàn)斷電或者機(jī)器故障等,PageCache上的數(shù)據(jù)就丟失了。

生產(chǎn)者端

  • ack有3種狀態(tài)保證消息被安全生產(chǎn) ack=0,消息傳輸?shù)紹roker端沒收到Broker的反饋即發(fā)送下一條,網(wǎng)絡(luò)故障導(dǎo)致小東西丟失。ack=1,如果剛好leader partition掛了,數(shù)據(jù)就會(huì)丟失。ack=all,min.insync.replicas如果小于N或者Topic只有一個(gè)同步副本。
  • 消息重試機(jī)制未開啟。
  • 當(dāng)前消息過大,超過max.request.size大小,默認(rèn)為1MB
  • 生產(chǎn)者速率超過消費(fèi)者,緩存池空間占滿后,生產(chǎn)線程阻塞超過最大時(shí)間,此時(shí)生產(chǎn)者會(huì)拋出異常,如果沒有處理好則會(huì)丟失數(shù)據(jù)。

消費(fèi)者端

enable.auto.commit=true,消費(fèi)在處理之前提交了offset,則處理異??赡軙?huì)造成消息的丟失。enable.auto.commit=false,Consumer手動(dòng)批量提交位點(diǎn),在批量位點(diǎn)中某個(gè)位點(diǎn)數(shù)據(jù)異常時(shí),沒有正確處理異常,而是將批量位點(diǎn)的最后一個(gè)位點(diǎn)提交,導(dǎo)致異常數(shù)據(jù)丟失

29、Kafka分區(qū)數(shù)越多越好嗎

并非分區(qū)數(shù)量越多,效率越高

  • Topic 每個(gè) partition 在 Kafka 路徑下都有一個(gè)自己的目錄,該目錄下有兩個(gè)主要的文件:base_offset.logbase_offset.index。Kafka 服務(wù)端的 ReplicaManager 會(huì)為每個(gè) Broker 節(jié)點(diǎn)保存每個(gè)分區(qū)的這兩個(gè)文件的文件句柄。所以如果分區(qū)過多,ReplicaManager 需要保持打開狀態(tài)的文件句柄數(shù)也就會(huì)很多。
  • 每個(gè) Producer, Consumer 進(jìn)程都會(huì)為分區(qū)緩存消息,如果分區(qū)過多,緩存的消息越多,占用的內(nèi)存就越大;
  • n 個(gè)分區(qū)有 1 個(gè) Leader,(n-1) 個(gè) Follower,如果運(yùn)行過程中 Leader 掛了,則會(huì)從剩余 (n-1) 個(gè) Followers 中選舉新 Leader;如果有成千上萬個(gè)分區(qū),那么需要很長(zhǎng)時(shí)間的選舉,消耗較大的性能。

30、Kafka如何保證消息的有序性

單分區(qū)

Kafka在特定條件下可以保障單分區(qū)消息的有序性

kafka在發(fā)送消息過程中,正常情況下是有序的,如果消息出現(xiàn)重試,則會(huì)造成消息亂序。導(dǎo)致亂序的原因是:max.in.flight.requests.per.connection默認(rèn)值為5。

該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前,請(qǐng)求隊(duì)列中可以提交多少個(gè)請(qǐng)求,用于提高網(wǎng)絡(luò)吞吐量。

圖中,batch1-5在請(qǐng)求隊(duì)列中,batch1作為最新數(shù)據(jù)進(jìn)行提交,提交失敗后如果開啟重試機(jī)制,則batch1會(huì)重新添加到本地緩沖池的頭部,然后提交至請(qǐng)求隊(duì)列中重新發(fā)送。此時(shí)batch1的順序會(huì)排在batch5之后,發(fā)生了亂序。

解決方式是將max.in.flight.requests.per.connection設(shè)置為1,消息隊(duì)列中只允許有一個(gè)請(qǐng)求,這樣消息失敗后,可以第一時(shí)間發(fā)送,不會(huì)產(chǎn)生亂序,但是會(huì)降低網(wǎng)絡(luò)吞吐量。

或者開啟生產(chǎn)者冪等性設(shè)置,開啟后,該P(yáng)roducer發(fā)送的消息都對(duì)應(yīng)一個(gè)單調(diào)增的Sequence Number。同樣的Broker端也會(huì)為每個(gè)生產(chǎn)者的每條消息維護(hù)一個(gè)序號(hào),并且每commit一條數(shù)據(jù)時(shí)就會(huì)將其序號(hào)遞增。對(duì)于接收到的數(shù)據(jù),如果其序號(hào)比Borker維護(hù)的序號(hào)大一(即表示是下一條數(shù)據(jù)),Broker會(huì)接收它,否則將其丟棄。如果消息序號(hào)比Broker維護(hù)的序號(hào)差值比一大,說明中間有數(shù)據(jù)尚未寫入,即亂序,此時(shí)Broker拒絕該消息,Producer拋出InvalidSequenceNumber 如果消息序號(hào)小于等于Broker維護(hù)的序號(hào),說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber Sender發(fā)送失敗后會(huì)重試,這樣可以保證每個(gè)消息都被發(fā)送到broker

多分區(qū)

Kafka本身無法保障多分區(qū)的有序性,可以通過業(yè)務(wù)設(shè)計(jì)進(jìn)行保證,例如需要單表數(shù)據(jù)通過自定義partition的方式發(fā)送至同一個(gè)分區(qū)

31、Kafka精確一次性(Exactly-once)如何保證

宏觀上:可靠性 + at least once + 冪等性

具體實(shí)現(xiàn):Kafka不丟消息-生產(chǎn)者冪等性-消費(fèi)者冪等性

詳見目錄:

12、Kafka可靠性保證(不丟消息)

18、如何保證消息不被重復(fù)消費(fèi)(消費(fèi)者冪等性)

24、談?wù)勀銓?duì)Kafka生產(chǎn)者冪等性的了解?

http://aloenet.com.cn/news/35863.html

相關(guān)文章:

  • 冬創(chuàng)網(wǎng)站建設(shè)培訓(xùn)中心網(wǎng)絡(luò)營(yíng)銷策劃內(nèi)容
  • 網(wǎng)站改版 更換服務(wù)器 排名丟失網(wǎng)站推廣建站
  • 品牌網(wǎng)站建設(shè)搭建優(yōu)化排名推廣關(guān)鍵詞
  • app軟件開發(fā)平臺(tái)游戲seo快速排名軟件方案
  • 想招聘員工去哪個(gè)網(wǎng)站手機(jī)網(wǎng)站建設(shè)案例
  • 下載學(xué)校網(wǎng)站模板下載51網(wǎng)站統(tǒng)計(jì)
  • 簡(jiǎn)單的網(wǎng)站設(shè)計(jì)圖aso關(guān)鍵詞優(yōu)化工具
  • 東莞長(zhǎng)安網(wǎng)站建設(shè)鄭州網(wǎng)絡(luò)營(yíng)銷
  • 榆林做網(wǎng)站多少錢seo顧問服
  • 注冊(cè)網(wǎng)站做推廣大澤山seo快速排名
  • 怎么看網(wǎng)站有沒有做301跳轉(zhuǎn)網(wǎng)上銷售渠道
  • 動(dòng)漫設(shè)計(jì)制作專業(yè)學(xué)什么seo關(guān)鍵詞排名優(yōu)化評(píng)價(jià)
  • 建設(shè)一個(gè)怎樣的自己的網(wǎng)站濟(jì)南競(jìng)價(jià)托管公司
  • wordpress建立網(wǎng)站寧波網(wǎng)站seo診斷工具
  • 小程序定制開發(fā)網(wǎng)站百度網(wǎng)址是什么
  • 網(wǎng)站壓縮山西網(wǎng)絡(luò)營(yíng)銷seo
  • 做提升自己的網(wǎng)站汕頭自動(dòng)seo
  • 織夢(mèng)網(wǎng)站模板怎么做搜索引擎seo外包
  • 網(wǎng)站會(huì)員模板網(wǎng)站關(guān)鍵詞推廣價(jià)格
  • vultr 做網(wǎng)站搜索引擎優(yōu)化的完整過程
  • vs2012手機(jī)網(wǎng)站開發(fā)教程常用的五種網(wǎng)絡(luò)營(yíng)銷工具
  • 工藝禮品東莞網(wǎng)站建設(shè)seoul national university
  • asp網(wǎng)站制作實(shí)例教程目前網(wǎng)絡(luò)推廣平臺(tái)
  • 天寧寺網(wǎng)站建設(shè)seo學(xué)校培訓(xùn)
  • 站長(zhǎng)工具綜合查詢ip怎樣在百度上發(fā)布作品
  • 怎么做提取微信62的網(wǎng)站網(wǎng)頁制作流程
  • 網(wǎng)站的內(nèi)連接如何做沈陽優(yōu)化網(wǎng)站公司
  • 怎么通過域名做網(wǎng)站年度關(guān)鍵詞有哪些
  • 在那個(gè)網(wǎng)站做義工好河南網(wǎng)站建設(shè)定制
  • 江蘇專業(yè)的網(wǎng)站建設(shè)一點(diǎn)優(yōu)化