国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

新網(wǎng)站百度搜不到上海搜索引擎優(yōu)化公司

新網(wǎng)站百度搜不到,上海搜索引擎優(yōu)化公司,網(wǎng)絡(luò)營(yíng)銷專業(yè)好不好,電商 做圖 網(wǎng)站架構(gòu)原理 一、高吞吐機(jī)制:Batch打包、緩沖區(qū)、acks 1. Kafka Producer怎么把消息發(fā)送給Broker集群的? 需要指定把消息發(fā)送到哪個(gè)topic去 首先需要選擇一個(gè)topic的分區(qū),默認(rèn)是輪詢來負(fù)載均衡,但是如果指定了一個(gè)分區(qū)key&#x…

架構(gòu)原理

一、高吞吐機(jī)制:Batch打包、緩沖區(qū)、acks

1. Kafka Producer怎么把消息發(fā)送給Broker集群的?

需要指定把消息發(fā)送到哪個(gè)topic去

首先需要選擇一個(gè)topic的分區(qū),默認(rèn)是輪詢來負(fù)載均衡,但是如果指定了一個(gè)分區(qū)key,那么根據(jù)這個(gè)key的hash值來分發(fā)到指定的分區(qū),這樣可以讓相同的key分發(fā)到同一個(gè)分區(qū)里去,還可以自定義partitioner來實(shí)現(xiàn)分區(qū)策略

producer.send(msg); // 用類似這樣的方式去發(fā)送消息,就會(huì)把消息給你均勻的分布到各個(gè)分區(qū)上去
producer.send(key, msg); // 訂單id,或者是用戶id,他會(huì)根據(jù)這個(gè)key的hash值去分發(fā)到某個(gè)分區(qū)上去,他可以保證相同的key會(huì)路由分發(fā)到同一個(gè)分區(qū)上去

知道要發(fā)送到哪個(gè)分區(qū)之后,還得找到這個(gè)分區(qū)的leader副本所在的機(jī)器,然后跟那個(gè)機(jī)器上的Broker通過Socket建立連接來進(jìn)行通信,發(fā)送Kafka自定義協(xié)議格式的請(qǐng)求過去,把消息就帶過去了

如果找到了partition的leader所在的broker之后,就可以通過socket跟那臺(tái)broker建立連接,接著發(fā)送消息過去

Producer(生產(chǎn)者客戶端),起碼要知道兩個(gè)元數(shù)據(jù),每個(gè)topic有幾個(gè)分區(qū),每個(gè)分區(qū)的leader是在哪臺(tái)broker上,會(huì)自己從broker上拉取kafka集群的元數(shù)據(jù),緩存在自己client本地客戶端上

kafka使用者的層面來考慮一下,我如果要把數(shù)據(jù)寫入kafka集群,應(yīng)該如何來做,怎么把數(shù)據(jù)寫入kafka集群,以及他背后的一些原理還有使用過程中需要設(shè)置的一些參數(shù),到底應(yīng)該怎么來弄

在這里插入圖片描述

2. 用一張圖告訴你Producer發(fā)送消息的內(nèi)部實(shí)現(xiàn)原理

每次發(fā)送消息都必須先把數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象,里面包含了要發(fā)送的topic,具體在哪個(gè)分區(qū),分區(qū)key,消息內(nèi)容,timestamp時(shí)間戳,然后這個(gè)對(duì)象交給序列化器,變成自定義協(xié)議格式的數(shù)據(jù)

接著把數(shù)據(jù)交給partitioner分區(qū)器,對(duì)這個(gè)數(shù)據(jù)選擇合適的分區(qū),默認(rèn)就輪詢所有分區(qū),或者根據(jù)key來hash路由到某個(gè)分區(qū),這個(gè)topic的分區(qū)信息,都是在客戶端會(huì)有緩存的,當(dāng)然會(huì)提前跟broker去獲取

接著這個(gè)數(shù)據(jù)會(huì)被發(fā)送到producer內(nèi)部的一塊緩沖區(qū)里

然后producer內(nèi)部有一個(gè)Sender線程,會(huì)從緩沖區(qū)里提取消息封裝成一個(gè)一個(gè)的batch,然后每個(gè)batch發(fā)送給分區(qū)的leader副本所在的broker
在這里插入圖片描述

3. 基于Java API寫一個(gè)Kafka Producer發(fā)送消息的代碼示例

package com.zhss.demo.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class ProducerDemo {public static void main(String[] args) throws Exception {Properties props = new Properties();// 這里可以配置幾臺(tái)broker即可,他會(huì)自動(dòng)從broker去拉取元數(shù)據(jù)進(jìn)行緩存props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092");  // 這個(gè)就是負(fù)責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 這個(gè)就是負(fù)責(zé)把你發(fā)送的實(shí)際的message從字符串序列化為字節(jié)數(shù)組props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);// 創(chuàng)建一個(gè)Producer實(shí)例:線程資源,跟各個(gè)broker建立socket連接資源KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "test-key", "test-value");// 這是異步發(fā)送的模式producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息發(fā)送成功System.out.println("消息發(fā)送成功");  } else {// 消息發(fā)送失敗,需要重新發(fā)送}}});Thread.sleep(10 * 1000); // 這是同步發(fā)送的模式
//		producer.send(record).get(); // 你要一直等待人家后續(xù)一系列的步驟都做完,發(fā)送消息之后// 有了消息的回應(yīng)返回給你,你這個(gè)方法才會(huì)退出來producer.close();}}

4. 發(fā)送消息給Broker時(shí)遇到的各種異常該如何處理?

之前我們看到不管是異步還是同步,都可能讓你處理異常,常見的異常如下:

LeaderNotAvailableException:這個(gè)就是如果某臺(tái)機(jī)器掛了,此時(shí)leader副本不可用,會(huì)導(dǎo)致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫入,此時(shí)可以重試發(fā)送即可

如果說你平時(shí)重啟kafka的broker進(jìn)程,肯定會(huì)導(dǎo)致leader切換,一定會(huì)導(dǎo)致你寫入報(bào)錯(cuò),是LeaderNotAvailableException

NotControllerException:這個(gè)也是同理,如果說Controller所在Broker掛了,那么此時(shí)會(huì)有問題,需要等待Controller重新選舉,此時(shí)也是一樣就是重試即可

NetworkException:網(wǎng)絡(luò)異常,重試即可

我們之前配置了一個(gè)參數(shù),retries,他會(huì)自動(dòng)重試的,但是如果重試幾次之后還是不行,就會(huì)提供Exception給我們來處理了

5. 發(fā)送消息的緩沖區(qū)應(yīng)該如何優(yōu)化來提升發(fā)送的吞吐量?

buffer.memory:設(shè)置發(fā)送消息的緩沖區(qū),默認(rèn)值是33554432,就是32MB

如果發(fā)送消息出去的速度小于寫入消息進(jìn)去的速度,就會(huì)導(dǎo)致緩沖區(qū)寫滿,此時(shí)生產(chǎn)消息就會(huì)阻塞住,所以說這里就應(yīng)該多做一些壓測(cè),盡可能保證說這塊緩沖區(qū)不會(huì)被寫滿導(dǎo)致生產(chǎn)行為被阻塞住

compression.type,默認(rèn)是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大producer端的cpu開銷

6. 消息批量發(fā)送的核心參數(shù)batch.size是如何優(yōu)化吞吐量?

batch.size,設(shè)置meigebatch的大小,如果batch太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請(qǐng)求,吞吐量下降;如果batch太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會(huì)讓內(nèi)存緩沖區(qū)有很大壓力,過多數(shù)據(jù)緩沖在內(nèi)存里

默認(rèn)值是:16384,就是16kb,也就是一個(gè)batch滿了16kb就發(fā)送出去,一般在實(shí)際生產(chǎn)環(huán)境,這個(gè)batch的值可以增大一些來提升吞吐量,可以自己壓測(cè)一下

還有一個(gè)參數(shù),linger.ms,這個(gè)值默認(rèn)是0,意思就是消息必須立即被發(fā)送,但是這是不對(duì)的,一般設(shè)置一個(gè)100毫秒之類的,這樣的話就是說,這個(gè)消息被發(fā)送出去后進(jìn)入一個(gè)batch,如果100毫秒內(nèi),這個(gè)batch滿了16kb,自然就會(huì)發(fā)送出去

但是如果100毫秒內(nèi),batch沒滿,那么也必須把消息發(fā)送出去了,不能讓消息的發(fā)送延遲時(shí)間太長(zhǎng),也避免給內(nèi)存造成過大的一個(gè)壓力

7. 如何根據(jù)業(yè)務(wù)場(chǎng)景對(duì)消息大小以及請(qǐng)求超時(shí)進(jìn)行合理的設(shè)置?

max.request.size:這個(gè)參數(shù)用來控制發(fā)送出去的消息的大小,默認(rèn)是1048576字節(jié),也就1mb,這個(gè)一般太小了,很多消息可能都會(huì)超過1mb的大小,所以需要自己優(yōu)化調(diào)整,把他設(shè)置更大一些

你發(fā)送出去的一條大數(shù)據(jù),超大的JSON串,超過1MB,就不讓你發(fā)了

request.timeout.ms:這個(gè)就是說發(fā)送一個(gè)請(qǐng)求出去之后,他有一個(gè)超時(shí)的時(shí)間限制,默認(rèn)是30秒,如果30秒都收不到響應(yīng),那么就會(huì)認(rèn)為異常,會(huì)拋出一個(gè)TimeoutException來讓我們進(jìn)行處理

8. 基于Kafka內(nèi)核架構(gòu)原理深入分析acks參數(shù)到底是干嘛的

acks參數(shù),其實(shí)是控制發(fā)送出去的消息的持久化機(jī)制的

如果acks=0,那么producer根本不管寫入broker的消息到底成功沒有,發(fā)送一條消息出去,立馬就可以發(fā)送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了,你也不知道的,但是說實(shí)話,你如果真是那種實(shí)時(shí)數(shù)據(jù)流分析的業(yè)務(wù)和場(chǎng)景,就是僅僅分析一些數(shù)據(jù)報(bào)表,丟幾條數(shù)據(jù)影響不大的

會(huì)讓你的發(fā)送吞吐量會(huì)提升很多,你發(fā)送弄一個(gè)batch出,不需要等待人家leader寫成功,直接就可以發(fā)送下一個(gè)batch了,吞吐量很大的,哪怕是偶爾丟一點(diǎn)點(diǎn)數(shù)據(jù),實(shí)時(shí)報(bào)表,折線圖,餅圖

acks=all,或者acks=-1:這個(gè)leader寫入成功以后,必須等待其他ISR中的副本都寫入成功,才可以返回響應(yīng)說這條消息寫入成功了,此時(shí)你會(huì)收到一個(gè)回調(diào)通知

min.insync.replicas = 2,ISR里必須有2個(gè)副本,一個(gè)leader和一個(gè)follower,最最起碼的一個(gè),不能只有一個(gè)leader存活,連一個(gè)follower都沒有了

acks = -1,每次寫成功一定是leader和follower都成功才可以算做成功,leader掛了,follower上是一定有這條數(shù)據(jù),不會(huì)丟失

retries = Integer.MAX_VALUE,無限重試,如果上述兩個(gè)條件不滿足,寫入一直失敗,就會(huì)無限次重試,保證說數(shù)據(jù)必須成功的發(fā)送給兩個(gè)副本,如果做不到,就不停的重試,除非是面向金融級(jí)的場(chǎng)景,面向企業(yè)大客戶,或者是廣告計(jì)費(fèi),跟錢的計(jì)算相關(guān)的場(chǎng)景下,才會(huì)通過嚴(yán)格配置保證數(shù)據(jù)絕對(duì)不丟失

acks=1:只要leader寫入成功,就認(rèn)為消息成功了,默認(rèn)給這個(gè)其實(shí)就比較合適的,還是可能會(huì)導(dǎo)致數(shù)據(jù)丟失的,如果剛寫入leader,leader就掛了,此時(shí)數(shù)據(jù)必然丟了,其他的follower沒收到數(shù)據(jù)副本,變成leader

9. 針對(duì)瞬間異常的消息重試參數(shù)有哪些需要考慮的點(diǎn)

有的時(shí)候一些leader切換之類的問題,需要進(jìn)行重試,設(shè)置retries即可,而且還可以跟消息不丟失結(jié)合起來,但是消息重試會(huì)導(dǎo)致重復(fù)發(fā)送的問題,比如說網(wǎng)絡(luò)抖動(dòng)一下導(dǎo)致他以為沒成功,就重試了,其實(shí)人家都成功了

所以消息重試導(dǎo)致的消費(fèi)重復(fù),需要你在下游consumer做冪等性處理,但是kafka已經(jīng)支持了一次且僅一次的消息語義

另外一個(gè),消息重試是可能導(dǎo)致消息的亂序的,因?yàn)榭赡芘旁谀愫竺娴南⒍及l(fā)送出去了,你現(xiàn)在收到回調(diào)失敗了才在重試,此時(shí)消息就會(huì)亂序,所以可以使用“max.in.flight.requests.per.connection”參數(shù)設(shè)置為1,這樣可以保證producer同一時(shí)間只能發(fā)送一條消息

兩次重試的間隔默認(rèn)是100毫秒,用“retry.backoff.ms”來進(jìn)行設(shè)置

一般來說,某臺(tái)broker重啟導(dǎo)致的leader切換,是最常見的異常,所以盡可能把重試次數(shù)和間隔,設(shè)置的可以cover住新leader切換過來

10. Kafka Producer高階用法(一):自定義分區(qū)

public class HotDataPartitioner implements Partitioner {private Random random;@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}}props.put(“partitioner.class, “com.zhss.HotDataPartitioner”);測(cè)試發(fā)送bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test-topic

11. Kafka Producer高階用法(二):自定義序列化

12. Kafka Producer高階用法(三):自定義攔截器

二、Kafka Consumer選舉與Rebalance實(shí)現(xiàn)原理

1. 一張圖畫清Kafka基于Consumer Group的消費(fèi)者組的模型

每個(gè)consumer都要屬于一個(gè)consumer.group,就是一個(gè)消費(fèi)組,topic的一個(gè)分區(qū)只會(huì)分配給一個(gè)消費(fèi)組下的一個(gè)consumer來處理,每個(gè)consumer可能會(huì)分配多個(gè)分區(qū),也有可能某個(gè)consumer沒有分配到任何分區(qū)

分區(qū)內(nèi)的數(shù)據(jù)是保證順序性的

group.id = “membership-consumer-group”

如果你希望實(shí)現(xiàn)一個(gè)廣播的效果,你的每臺(tái)機(jī)器都要消費(fèi)到所有的數(shù)據(jù),每臺(tái)機(jī)器啟動(dòng)的時(shí)候,group.id可以是一個(gè)隨機(jī)生成的UUID也可以,你只要讓不同的機(jī)器的KafkaConsumer的group.id是不一樣的

如果consumer group中某個(gè)消費(fèi)者掛了,此時(shí)會(huì)自動(dòng)把分配給他的分區(qū)交給其他的消費(fèi)者,如果他又重啟了,那么又會(huì)把一些分區(qū)重新交還給他,這個(gè)就是所謂的消費(fèi)者rebalance的過程

2. 消費(fèi)者offset的記錄方式以及基于內(nèi)部topic的提交模式

每個(gè)consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對(duì)每個(gè)topic的每個(gè)分區(qū)的消費(fèi)offset,定期會(huì)提交offset,老版本是寫入zk,但是那樣高并發(fā)請(qǐng)求zk是不合理的架構(gòu)設(shè)計(jì),zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級(jí)的元數(shù)據(jù)存儲(chǔ),不能負(fù)責(zé)高并發(fā)讀寫,作為數(shù)據(jù)存儲(chǔ)

所以后來就是提交offset發(fā)送給內(nèi)部topic:__consumer_offsets,提交過去的時(shí)候,key是group.id+topic+分區(qū)號(hào),value就是當(dāng)前offset的值,每隔一段時(shí)間,kafka內(nèi)部會(huì)對(duì)這個(gè)topic進(jìn)行compact

也就是每個(gè)group.id+topic+分區(qū)號(hào)就保留最新的那條數(shù)據(jù)即可

而且因?yàn)檫@個(gè)__consumer_offsets可能會(huì)接收高并發(fā)的請(qǐng)求,所以默認(rèn)分區(qū)50個(gè),這樣如果你的kafka部署了一個(gè)大的集群,比如有50臺(tái)機(jī)器,就可以用50臺(tái)機(jī)器來抗offset提交的請(qǐng)求壓力,就好很多
在這里插入圖片描述

3. 基于Java API寫一個(gè)Kafka Consumer消費(fèi)消息的代碼示例

String topicName = “test-topic”;
String groupId = “test-group”;Properties props = new Properties();
props.put(“bootstrap.servers”, “l(fā)ocalhost:9092);
props.put(“group.id”, “groupId”);
props.put(“enable.auto.commit”,true);
props.put(“auto.commit.ineterval.ms”,1000);
// 每次重啟都是從最早的offset開始讀取,不是接著上一次
props.put(“auto.offset.reset”, “earliest”); 
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.SttringDeserializer”);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 超時(shí)時(shí)間
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() +,+ record.key() +,+ record.value());
}
}
} catch(Exception e) {}

4. Kafka感知消費(fèi)者故障是通過哪三個(gè)參數(shù)來實(shí)現(xiàn)的?

heartbeat.interval.ms:consumer心跳時(shí)間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會(huì)通過心跳下發(fā)rebalance的指令給其他的consumer通知他們進(jìn)行rebalance的操作

session.timeout.ms:kafka多長(zhǎng)時(shí)間感知不到一個(gè)consumer就認(rèn)為他故障了,默認(rèn)是10秒

max.poll.interval.ms:如果在兩次poll操作之間,超過了這個(gè)時(shí)間,那么就會(huì)認(rèn)為這個(gè)consume處理能力太弱了,會(huì)被踢出消費(fèi)組,分區(qū)分配給別人去消費(fèi),一遍來說結(jié)合你自己的業(yè)務(wù)處理的性能來設(shè)置就可以了

5. 對(duì)消息進(jìn)行消費(fèi)時(shí)有哪幾個(gè)參數(shù)需要注意以及設(shè)置呢?

fetch.max.bytes:獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些

max.poll.records:一次poll返回消息的最大條數(shù),默認(rèn)是500條

connection.max.idle.ms:consumer跟broker的socket連接如果空閑超過了一定的時(shí)間,此時(shí)就會(huì)自動(dòng)回收連接,但是下次消費(fèi)就要重新建立socket連接,這個(gè)建議設(shè)置為-1,不要去回收

6. 消費(fèi)者offset相關(guān)的參數(shù)設(shè)置會(huì)對(duì)運(yùn)行產(chǎn)生什么樣的影響?

auto.offset.reset:這個(gè)參數(shù)的意思是,如果下次重啟,發(fā)現(xiàn)要消費(fèi)的offset不在分區(qū)的范圍內(nèi),就會(huì)重頭開始消費(fèi);但是如果正常情況下會(huì)接著上次的offset繼續(xù)消費(fèi)的

enable.auto.commit:這個(gè)就是開啟自動(dòng)提交位移

7. Group Coordinator是什么以及主要負(fù)責(zé)什么?

每個(gè)consumer group都會(huì)選擇一個(gè)broker作為自己的coordinator,他是負(fù)責(zé)監(jiān)控這個(gè)消費(fèi)組里的各個(gè)消費(fèi)者的心跳,以及判斷是否宕機(jī),然后開啟rebalance的,那么這個(gè)如何選擇呢?

就是根據(jù)group.id來進(jìn)行選擇,他有內(nèi)部的一個(gè)選擇機(jī)制,會(huì)給你挑選一個(gè)對(duì)應(yīng)的Broker,總會(huì)把你的各個(gè)消費(fèi)組均勻分配給各個(gè)Broker作為coordinator來進(jìn)行管理的

他負(fù)責(zé)的事情只要就是rebalance,說白了你的consumer group中的每個(gè)consumer剛剛啟動(dòng)就會(huì)跟選舉出來的這個(gè)consumer group對(duì)應(yīng)的coordinator所在的broker進(jìn)行通信,然后由coordinator分配分區(qū)給你的這個(gè)consumer來進(jìn)行消費(fèi)

coordinator會(huì)盡可能均勻的分配分區(qū)給各個(gè)consumer來消費(fèi)

8. 為消費(fèi)者選擇Coordinator的算法是如何實(shí)現(xiàn)的?

首先對(duì)groupId進(jìn)行hash,接著對(duì)__consumer_offsets的分區(qū)數(shù)量取模,默認(rèn)是50,可以通過offsets.topic.num.partitions來設(shè)置,找到你的這個(gè)consumer group的offset要提交到__consumer_offsets的哪個(gè)分區(qū)

比如說:groupId,“membership-consumer-group” -> hash值(數(shù)字)-> 對(duì)50取模 -> 就知道這個(gè)consumer group下的所有的消費(fèi)者提交offset的時(shí)候是往哪個(gè)分區(qū)去提交offset,大家可以找到__consumer_offsets的一個(gè)分區(qū)

__consumer_offset的分區(qū)的副本數(shù)量默認(rèn)來說1,只有一個(gè)leader

然后對(duì)這個(gè)分區(qū)找到對(duì)應(yīng)的leader所在的broker,這個(gè)broker就是這個(gè)consumer group的coordinator了,接著就會(huì)維護(hù)一個(gè)Socket連接跟這個(gè)Broker進(jìn)行通信

9. Coordinator和Consume Leader如何協(xié)作制定分區(qū)方案?

每個(gè)consumer都發(fā)送JoinGroup請(qǐng)求到Coordinator,然后Coordinator從一個(gè)consumer group中選擇一個(gè)consumer作為leader,把consumer group情況發(fā)送給這個(gè)leader,接著這個(gè)leader會(huì)負(fù)責(zé)制定分區(qū)方案,通過SyncGroup發(fā)給Coordinator

接著Coordinator就把分區(qū)方案下發(fā)給各個(gè)consumer,他們會(huì)從指定的分區(qū)的leader broker開始進(jìn)行socket連接以及消費(fèi)消息
在這里插入圖片描述

10. rebalance的三種策略分別有哪些優(yōu)劣勢(shì)?

這里有三種rebalance的策略:range、round-robin、sticky

0~8

order-topic-0
order-topic-1
order-topic-2

range策略就是按照partiton的序號(hào)范圍,比如partitioin02給一個(gè)consumer,partition35給一個(gè)consumer,partition6~8給一個(gè)consumer,默認(rèn)就是這個(gè)策略;

round-robin策略,就是輪詢分配,比如partiton0、3、6給一個(gè)consumer,partition1、4、7給一個(gè)consumer,partition2、5、8給一個(gè)consumer

但是上述的問題就在于說,可能在rebalance的時(shí)候會(huì)導(dǎo)致分區(qū)被頻繁的重新分配,比如說掛了一個(gè)consumer,然后就會(huì)導(dǎo)致partition04分配給第一個(gè)consumer,partition58分配給第二個(gè)consumer

這樣的話,原本是第二個(gè)consumer消費(fèi)的partition3~4就給了第一個(gè)consumer,實(shí)際上來說未必就很好

最新的一個(gè)sticky策略,就是說盡可能保證在rebalance的時(shí)候,讓原本屬于這個(gè)consumer的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過去,這樣盡可能維持原來的分區(qū)分配的策略

consumer1:0~2 + 6~7
consumer2:3~5 + 8

11. Consumer內(nèi)部單線程處理一切事務(wù)的核心設(shè)計(jì)思想

其實(shí)就是在一個(gè)while循環(huán)里不停的去調(diào)用poll()方法,其實(shí)是我們自己的一個(gè)線程,就是我們自己的這個(gè)線程就是唯一的KafkaConsumer的工作線程,新版本的kafka api,簡(jiǎn)化,減少了線程數(shù)量

Consumer自己內(nèi)部就一個(gè)后臺(tái)線程,定時(shí)發(fā)送心跳給broker;但是其實(shí)負(fù)責(zé)進(jìn)行拉取消息、緩存消息、在內(nèi)存里更新offset、每隔一段時(shí)間提交offset、執(zhí)行rebalance這些任務(wù)的就一個(gè)線程,其實(shí)就是我們調(diào)用Consumer.poll()方法的那個(gè)線程

就一個(gè)線程調(diào)用進(jìn)去,會(huì)負(fù)責(zé)把所有的事情都干了

為什么叫做poll呢?因?yàn)榫褪悄憧梢员O(jiān)聽N多個(gè)Topic的消息,此時(shí)會(huì)跟集群里很多Kafka Broker維護(hù)一個(gè)Socket連接,然后每一次線程調(diào)用poll(),就會(huì)監(jiān)聽多個(gè)socket是否有消息傳遞過來

可能一個(gè)consumer會(huì)消費(fèi)很多個(gè)partition,每個(gè)partition其實(shí)都是leader可能在不同的broker上,那么如果consumer要拉取多個(gè)partition的數(shù)據(jù),就需要跟多個(gè)broker進(jìn)行通信,維護(hù)socket

每個(gè)socket就會(huì)跟一個(gè)broker進(jìn)行通信

每個(gè)Consumer內(nèi)部會(huì)維護(hù)多個(gè)Socket,負(fù)責(zé)跟多個(gè)Broker進(jìn)行通信,我們就一個(gè)工作線程每次調(diào)用poll()的時(shí)候,他其實(shí)會(huì)監(jiān)聽多個(gè)socket跟broker的通信,是否有新的數(shù)據(jù)可以去拉取
在這里插入圖片描述

12. 消費(fèi)過程中的各種offset之間的關(guān)系是什么?

上一次提交offset,當(dāng)前offset(還未提交),高水位offset,LEO

內(nèi)存里記錄這么幾個(gè)東西:上一次提交offset,當(dāng)前消費(fèi)到的offset,你不斷的在消費(fèi)消息,不停的在拉取新的消息,不停的更新當(dāng)前消費(fèi)的offset,HW offset,你拉取的時(shí)候,是只能看到HW他前面的數(shù)據(jù)

LEO,leader partition已經(jīng)更新到了一個(gè)offset了,但是HW在前面,你只能拉取到HW的數(shù)據(jù),HW后面的數(shù)據(jù),意味著不是所有的follower都寫入進(jìn)去了,所以不能去讀取的

13. 自動(dòng)提交offset的語義以及導(dǎo)致消息丟失和重復(fù)消費(fèi)的問題

默認(rèn)是自動(dòng)提交

auto.commit.inetrval.ms:5000,默認(rèn)是5秒提交一次

如果你提交了消費(fèi)到的offset之后,人家kafka broker就可以感知到了,比如你消費(fèi)到了offset = 56987,下次你的consumer再次重啟的時(shí)候,就會(huì)自動(dòng)從kafka broker感知到說自己上一次消費(fèi)到的offset = 56987

這次重啟之后,就繼續(xù)從offset = 56987這個(gè)位置繼續(xù)往后去消費(fèi)就可以了

他的語義是一旦消息給你poll到了之后,這些消息就認(rèn)為處理完了,后續(xù)就可以提交了,所以這里有兩種問題:

第一,消息丟失,如果你剛poll到消息,然后還沒來得及處理,結(jié)果人家已經(jīng)提交你的offset了,此時(shí)你如果consumer宕機(jī),再次重啟,數(shù)據(jù)丟失,因?yàn)樯弦淮蜗M(fèi)的那批數(shù)據(jù)其實(shí)你沒處理,結(jié)果人家認(rèn)為你處理了

poll到了一批數(shù)據(jù),offset = 65510~65532,人家剛好就是到了時(shí)間提交了offset,offset = 65532這個(gè)地方已經(jīng)提交給了kafka broker,接著你準(zhǔn)備對(duì)這批數(shù)據(jù)進(jìn)行消費(fèi),但是不巧的是,你剛要消費(fèi)就直接宕機(jī)了

其實(shí)你消費(fèi)到的數(shù)據(jù)是沒處理的,但是消費(fèi)offset已經(jīng)提交給kafka了,下次你重啟的時(shí)候,offset = 65533這個(gè)位置開始消費(fèi)的,之前的一批數(shù)據(jù)就丟失了

第二,重復(fù)消費(fèi),如果你poll到消息,都處理完畢了,此時(shí)還沒來得及提交offset,你的consumer就宕機(jī)了,再次重啟會(huì)重新消費(fèi)到這一批消息,再次處理一遍,那么就是有消息重復(fù)消費(fèi)的問題

poll到了一批數(shù)據(jù),offset = 65510~65532,你很快的處理完了,都寫入數(shù)據(jù)庫(kù)了,結(jié)果還沒來得及提交offset就宕機(jī)了,上一次提交的offset = 65509,重啟,他會(huì)再次讓你消費(fèi)offset = 65510~65532,一樣的數(shù)據(jù)再次重復(fù)消費(fèi)了一遍,寫入數(shù)據(jù)庫(kù)

重啟kafka consumer,修改了他的代碼

14. 如何實(shí)現(xiàn)Consumer Group的狀態(tài)機(jī)流轉(zhuǎn)機(jī)制?

剛開始Consumer Group狀態(tài)是:Empty

接著如果部分consumer發(fā)送了JoinGroup請(qǐng)求,會(huì)進(jìn)入:PreparingRebalance的狀態(tài),等待一段時(shí)間其他成員加入,這個(gè)時(shí)間現(xiàn)在默認(rèn)就是max.poll.interval.ms來指定的,所以這個(gè)時(shí)間間隔一般可以稍微大一點(diǎn)

接著如果所有成員都加入組了,就會(huì)進(jìn)入AwaitingSync狀態(tài),這個(gè)時(shí)候就不能允許任何一個(gè)consumer提交offset了,因?yàn)轳R上要rebalance了,進(jìn)行重新分配了,這個(gè)時(shí)候就會(huì)選擇一個(gè)leader consumer,由他來制定分區(qū)方案

然后leader consumer制定好了分區(qū)方案,SyncGroup請(qǐng)求發(fā)送給coordinator,他再下發(fā)方案給所有的consumer成員,此時(shí)進(jìn)入stable狀態(tài),都可以正?;趐oll來消費(fèi)了

所以如果說在stable狀態(tài)下,有consumer進(jìn)入組或者離開崩潰了,那么都會(huì)重新進(jìn)入PreparingRebalance狀態(tài),重新看看當(dāng)前組里有誰,如果剩下的組員都在,那么就進(jìn)入AwaitingSync狀態(tài)

leader consumer重新制定方案,然后再下發(fā)

15. 最新設(shè)計(jì)的rebalance分代機(jī)制可以有什么作用?

大家設(shè)想一個(gè)場(chǎng)景,在rebalance的時(shí)候,可能你本來消費(fèi)了partition3的數(shù)據(jù),結(jié)果有些數(shù)據(jù)消費(fèi)了還沒提交offset,結(jié)果此時(shí)rebalance,把partition3分配給了另外一個(gè)cnosumer了,此時(shí)你如果提交partition3的數(shù)據(jù)的offset,能行嗎?

必然不行,所以每次rebalance會(huì)觸發(fā)一次consumer group generation,分代,每次分代會(huì)加1,然后你提交上一個(gè)分代的offset是不行的,那個(gè)partiton可能已經(jīng)不屬于你了,大家全部按照新的partiton分配方案重新消費(fèi)數(shù)據(jù)

consumer group generation = 1
consumer group generation = 2

16. Consumer端的自定義反序列化器是什么?

17. 自行指定每個(gè)Consumer要消費(fèi)哪些分區(qū)有用嗎?

List partitions = consumer.partitionsFor(“order-topic”);

new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

consumer.assign(partitions); //指定每個(gè)consumer要消費(fèi)哪些分區(qū),你就不是依靠consumer的自動(dòng)的分區(qū)分配方案來做了

18. 老版本的high-level consumer的實(shí)現(xiàn)原理是什么?

producer和consumer api原理,都是新版本的kafka api

老版本的kafka consumer api分成兩種,high-level和low-level,都是基于zk實(shí)現(xiàn)的,只不過前者有consumer group的概念,后者沒有

high-level的api,比如說consumer啟動(dòng)就是在zk里寫一個(gè)臨時(shí)節(jié)點(diǎn),但是如果自己宕機(jī)了,那么zk臨時(shí)節(jié)點(diǎn)就沒了,別人就會(huì)發(fā)現(xiàn),然后就會(huì)開啟rebalance

然后在消費(fèi)的時(shí)候,可以指定多個(gè)線程取消費(fèi)一個(gè)topic,比如說你和這個(gè)consumer分配到了5個(gè)分區(qū),那么你可以指定最多5個(gè)線程,每個(gè)線程消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),但是新版本的就一個(gè)線程負(fù)責(zé)消費(fèi)所有分區(qū)

在提交offset,就是向zk寫入對(duì)某個(gè)分區(qū)現(xiàn)在消費(fèi)到了哪個(gè)offset了,默認(rèn)60秒才提交一次

新版本的api就不基于zk來實(shí)現(xiàn)了呢,zk主要是做輕量級(jí)的分布式協(xié)調(diào),元數(shù)據(jù)存儲(chǔ),并不適合高并發(fā)大量連接的場(chǎng)景,cnosumer可能有成百上千個(gè),成千上萬個(gè),zk來做的,連接的壓力,高并發(fā)的讀寫

broker內(nèi)部基于zk來進(jìn)行協(xié)調(diào)

19. 老版本的low-level consumer的實(shí)現(xiàn)原理是什么?

老版本的low-level消費(fèi)者,是可以自己控制offset的,實(shí)現(xiàn)很底層的一些控制,但是需要自己去提交offset,還要自己找到某個(gè)分區(qū)對(duì)應(yīng)的leader broker,跟他進(jìn)行連接獲取消息,如果leader變化了,也得自己處理,非常的麻煩

比如說storm-kafka這個(gè)插件,在storm消費(fèi)kafka數(shù)據(jù)的時(shí)候,就是使用的low-level api,自己獲取offset,提交寫入zk中自己指定的znode中,但是在未來基本上老版本的會(huì)越來越少使用

三、Kafka的時(shí)間輪延時(shí)調(diào)度機(jī)制與架構(gòu)原理總結(jié)

1. Producer的緩沖區(qū)內(nèi)部數(shù)據(jù)結(jié)構(gòu)是什么樣子的?

producer會(huì)創(chuàng)建一個(gè)accumulator緩沖區(qū),他里面是一個(gè)HashMap數(shù)據(jù)結(jié)構(gòu),每個(gè)分區(qū)都會(huì)對(duì)應(yīng)一個(gè)batch隊(duì)列,因?yàn)槟愦虬沙鰜淼腷atch,那必須是這個(gè)batch都是發(fā)往同一個(gè)分區(qū)的,這樣才能發(fā)送一個(gè)batch到這個(gè)分區(qū)的leader broker

{
“order-topic-0” -> [batch1, batch2],
“order-topic-1” -> [batch3]
}

batch.size

每個(gè)batch包含三個(gè)東西,一個(gè)是compressor,這是負(fù)責(zé)追加寫入batch的組件;第二個(gè)是batch緩沖區(qū),就是寫入數(shù)據(jù)的地方;第三個(gè)是thunks,就是每個(gè)消息都有一個(gè)回調(diào)Callback匿名內(nèi)部類的對(duì)象,對(duì)應(yīng)batch里每個(gè)消息的回調(diào)函數(shù)

每次寫入一條數(shù)據(jù)都對(duì)應(yīng)一個(gè)Callback回調(diào)函數(shù)的

2. 消息緩沖區(qū)滿的時(shí)候是阻塞住還是拋出異常?

max.block.ms,其實(shí)就是說如果寫緩沖區(qū)滿了,此時(shí)是阻塞住一段時(shí)間,然后什么時(shí)候拋異常,默認(rèn)是60000,也就是60秒

3. 負(fù)責(zé)IO請(qǐng)求的Sender線程是如何基于緩沖區(qū)發(fā)送數(shù)據(jù)的?

Sender線程會(huì)不停的輪詢緩沖區(qū)內(nèi)的HashMap,看batch是否滿了,或者是看linger.ms時(shí)間是不是到了,然后就得發(fā)送數(shù)據(jù)去,發(fā)送的時(shí)候會(huì)根據(jù)各個(gè)batch的目標(biāo)leader broker來進(jìn)行分組

因?yàn)榭赡懿煌腷atch是對(duì)應(yīng)不同的分區(qū),但是不同的分區(qū)的Leader是在一個(gè)broker上的,<Node, List>,接著會(huì)進(jìn)一步封裝為<Node, Request>,每個(gè)broker一次就是一個(gè)請(qǐng)求,但是這里可能包含很多個(gè)batch,接著就是將分組好的batch發(fā)送給leader broker,并且處理response,來反過來調(diào)用每個(gè)batch的callback函數(shù)

發(fā)送出去的Request會(huì)被放入InFlighRequests里面去保存,Map<NodeId, Deque>,這里就代表了發(fā)送出去的請(qǐng)求,但是還沒接收到響應(yīng)的

4. 同時(shí)可以接受有幾個(gè)發(fā)送到Broker的請(qǐng)求沒收到響應(yīng)?

Map<NodeId, Deque> => 給這個(gè)broker發(fā)送了哪些請(qǐng)求過去了?

max.in.flight.requests.per.connection:5

這個(gè)參數(shù)默認(rèn)值是5,默認(rèn)情況下,每個(gè)Broker最多只能有5個(gè)請(qǐng)求是發(fā)送出去但是還沒接收到響應(yīng)的,所以這種情況下是有可能導(dǎo)致順序錯(cuò)亂的,大家一定要搞清楚這一點(diǎn),先發(fā)送的請(qǐng)求可能后續(xù)要重發(fā)

5. Kafka自定義的基于TCP的二進(jìn)制協(xié)議深入探秘一番(一)

kafka自定義了一組二進(jìn)制的協(xié)議,現(xiàn)在一共是包含了43種協(xié)議類型,每種協(xié)議都有對(duì)應(yīng)的請(qǐng)求和響應(yīng),Request和Response,其實(shí)說白了,如果大家現(xiàn)在看咱們的那個(gè)自研分布式存儲(chǔ)系統(tǒng)的課,里面用到了gRPC

你大概可以認(rèn)為就是定義了43種接口,每個(gè)接口就是一種協(xié)議,然后每個(gè)接口都有自己對(duì)應(yīng)的Request和Response,就這個(gè)意思

每個(gè)協(xié)議的Request都有相同的請(qǐng)求頭(RequestHeader),也有不同的請(qǐng)求體(RequestBody),請(qǐng)求頭包含了:api_key、api_version、correlation_id、client_id,這里的api_key就類似于“PRODUCE”、“FETCH”,你可以認(rèn)為是接口的名字吧

“PRODUCE”就是發(fā)送消息的接口,“FETCH”就是拉取消息的接口,就這個(gè)意思

api_version,就是這個(gè)API的版本號(hào)

correlation_id,就是類似客戶端生成的一次請(qǐng)求的唯一標(biāo)志位,唯一標(biāo)識(shí)一次請(qǐng)求

client_id,就是客戶端的id

每個(gè)協(xié)議的Response也有相同的響應(yīng)頭,就是一個(gè)correlation_id,就是對(duì)某個(gè)請(qǐng)求的響應(yīng)

6. Kafka自定義的基于TCP的二進(jìn)制協(xié)議深入探秘一番(二)

比如說發(fā)送消息,就是ProduceRequest和ProduceResponse,代表“PRODUCE”這個(gè)接口的請(qǐng)求和響應(yīng),api_key=0,其實(shí)就是“PRODUCE”接口的代表

他的RequestBody,包含了:transactional_id,acks,timeout,topic_data(topic,data(partition,record_set)),acks就是客戶端自己指定的acks參數(shù),這個(gè)會(huì)指示leader和follower副本的寫入方式,timeout就是超時(shí)時(shí)間,默認(rèn)就是30秒,request.timeout.ms

然后就是要寫入哪個(gè)topic,哪個(gè)分區(qū),以及對(duì)應(yīng)數(shù)據(jù)集合,里面是多個(gè)batch

ProduceResponse,ResponseBody,包含了responses(topic,partition_responses(partition,error_code,base_offset,log_append_time,log_start_offset)),throttle_time_ms,簡(jiǎn)單來說就是當(dāng)前響應(yīng)是對(duì)哪個(gè)topic寫入的響應(yīng)

包含了每個(gè)topic的各個(gè)分區(qū)的響應(yīng),每個(gè)partition的寫入響應(yīng),包括error_code錯(cuò)誤碼,base_offset是消息集合的起始o(jì)ffset,log_append_time是寫入broker端的時(shí)間,log_start_offset是分區(qū)的起始o(jì)ffset

其實(shí)各種接口大體上來說就是如此,所以現(xiàn)在大家就知道了,協(xié)議就是一種規(guī)定,你發(fā)送過來的請(qǐng)求是什么格式的,他可能有請(qǐng)求頭還有請(qǐng)求體,分別包含哪些字段,按什么格式放數(shù)據(jù),響應(yīng)也是一樣的

然后大家就可以按一樣的協(xié)議來發(fā)送請(qǐng)求和接收響應(yīng)

7. 盤點(diǎn)一下在Broker內(nèi)部有哪些不同場(chǎng)景下會(huì)有延時(shí)任務(wù)?

比如說acks=-1,那么必須等待leader和follower都寫完才能返回響應(yīng),而且有一個(gè)超時(shí)時(shí)間,默認(rèn)是30秒,也就是request.timeout.ms,那么在寫入一條數(shù)據(jù)到leader磁盤之后,就必須有一個(gè)延時(shí)任務(wù),到期時(shí)間是30秒

延時(shí)任務(wù)會(huì)被放到DelayedOperationPurgatory,延時(shí)操作管理器中

這個(gè)延時(shí)任務(wù)如果因?yàn)樗衒ollower都寫入副本到本地磁盤了,那么就會(huì)被自動(dòng)觸發(fā)蘇醒,那么就可以返回響應(yīng)結(jié)果給客戶端了,否則的話,這個(gè)延時(shí)任務(wù)自己指定了最多是30秒到期,如果到了超時(shí)時(shí)間都沒等到,那么就直接超時(shí)返回異常了

還有一種是延時(shí)拉取任務(wù),也就是說follower往leader拉取消息的時(shí)候,如果發(fā)現(xiàn)是空的,那么此時(shí)會(huì)創(chuàng)建一個(gè)延時(shí)拉取任務(wù),然后延時(shí)時(shí)間到了之后,就會(huì)再次讀取一次消息,如果過程中l(wèi)eader寫入了消息那么也會(huì)自動(dòng)執(zhí)行這個(gè)拉取任務(wù)

8. Kafka的時(shí)間輪延時(shí)調(diào)度機(jī)制(一):O(1)時(shí)間復(fù)雜度

Kafka內(nèi)部有很多延時(shí)任務(wù),沒有基于JDK Timer來實(shí)現(xiàn),那個(gè)插入和刪除任務(wù)的時(shí)間復(fù)雜度是O(nlogn),而是基于了自己寫的時(shí)間輪來實(shí)現(xiàn)的,時(shí)間復(fù)雜度是O(1),其實(shí)Netty、ZooKeeper、Quartz很多中間件都會(huì)實(shí)現(xiàn)時(shí)間輪

延時(shí)任務(wù)是很多很多的,大量的發(fā)送消息以及拉取消息,都會(huì)涉及到延時(shí)任務(wù),任務(wù)數(shù)量很多,如果基于傳統(tǒng)的JDK Timer把大量的延時(shí)任務(wù)頻繁的插入和刪除,時(shí)間復(fù)雜度是O(nlogn)性能比較低的

時(shí)間輪的機(jī)制,延時(shí)任務(wù)插入和刪除,O(1)

簡(jiǎn)單來說,一個(gè)時(shí)間輪(TimerWheel)就是一個(gè)數(shù)組實(shí)現(xiàn)的存放定時(shí)任務(wù)的環(huán)形隊(duì)列,數(shù)組每個(gè)元素都是一個(gè)定時(shí)任務(wù)列表(TimerTaskList),這個(gè)TimerTaskList是一個(gè)環(huán)形雙向鏈表,鏈表里的每個(gè)元素都是定時(shí)任務(wù)(TimerTask)

時(shí)間輪是有很多個(gè)時(shí)間格的,一個(gè)時(shí)間格就是時(shí)間輪的時(shí)間跨度tickMs,wheelSize就是時(shí)間格的數(shù)量,時(shí)間輪的總時(shí)間跨度就是tickMs * wheelSize(interval),然后還有一個(gè)表盤指針(currentTime),就是時(shí)間輪當(dāng)前所處的時(shí)間

currentTime指向的時(shí)間格就是到期,需要執(zhí)行里面的定時(shí)任務(wù)

比如說tickMs = 1ms,wheelSize = 20,那么時(shí)間輪跨度(inetrval)就是20ms,剛開始currentTime = 0,這個(gè)時(shí)候如果有一個(gè)延時(shí)2ms之后執(zhí)行的任務(wù)插入進(jìn)來,就會(huì)基于數(shù)組的index直接定位到時(shí)間輪底層數(shù)組的第三個(gè)元素

因?yàn)閠ickMs = 1ms,所以第一個(gè)元素代表的是0ms,第二個(gè)元素代表的是1ms的地方,第三個(gè)元素代表的就是2ms的地方,直接基于數(shù)組來定位就是O(1)是吧,然后到數(shù)組之后把這個(gè)任務(wù)插入其中的雙向鏈表,這個(gè)時(shí)間復(fù)雜度也是O(1)

所以這個(gè)插入定時(shí)任務(wù)的時(shí)間復(fù)雜度就是O(1)

然后currentTime會(huì)隨著時(shí)間不斷的推移,1ms之后會(huì)指向第二個(gè)時(shí)間格,2ms之后會(huì)指向第三個(gè)時(shí)間格,這個(gè)時(shí)候就會(huì)執(zhí)行第三個(gè)時(shí)間格里剛才插入進(jìn)來要在2ms之后執(zhí)行的那個(gè)任務(wù)了

這個(gè)時(shí)候如果插入進(jìn)來一個(gè)8ms之后要執(zhí)行的任務(wù),那么就會(huì)放到第11個(gè)時(shí)間格上去,相比于currentTime剛好是8ms之后,對(duì)吧,就是個(gè)意思,然后如果是插入一個(gè)19ms之后執(zhí)行的呢?那就會(huì)放在第二個(gè)時(shí)間格

每個(gè)插入進(jìn)來的任務(wù),他都會(huì)依據(jù)當(dāng)前的currentTime來放,最后正好要讓currentTime轉(zhuǎn)動(dòng)這么多時(shí)間之后,正好可以執(zhí)行那個(gè)時(shí)間格里的任務(wù)

9. Kafka的時(shí)間輪延時(shí)調(diào)度機(jī)制(二):多層級(jí)時(shí)間輪

接著上一講的內(nèi)容,那如果這個(gè)時(shí)候來一個(gè)350毫秒之后執(zhí)行的定時(shí)任務(wù)呢?已經(jīng)超出當(dāng)前這個(gè)時(shí)間輪的范圍了,那么就放到上層時(shí)間輪,上層時(shí)間輪的tickMs就是下層時(shí)間輪的interval,也就是20ms

wheelSize是固定的,都是20,那么上層時(shí)間輪的inetrval周期就是400ms,如果再上一層的時(shí)間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時(shí)間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可

反正有很多層級(jí)的時(shí)間輪,一個(gè)時(shí)間輪不夠,就往上開辟一個(gè)新的時(shí)間輪出來,每個(gè)時(shí)間輪的tickMs是下級(jí)時(shí)間輪的interval,而且currentTime就跟時(shí)鐘的指針一樣是不停的轉(zhuǎn)動(dòng)的,你只要根據(jù)定時(shí)周期把他放入對(duì)應(yīng)的輪子即可

每個(gè)輪子插入的時(shí)候根據(jù)currentTime,放到對(duì)應(yīng)時(shí)間之后的時(shí)間格即可

比如定時(shí)350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時(shí)間輪內(nèi),currentTime自然會(huì)轉(zhuǎn)動(dòng)到那個(gè)時(shí)間格來執(zhí)行他

10. Kafka的時(shí)間輪延時(shí)調(diào)度機(jī)制(三):時(shí)間輪層級(jí)的下滑

接著上一講的內(nèi)容,那如果這個(gè)時(shí)候來一個(gè)350毫秒之后執(zhí)行的定時(shí)任務(wù)呢?已經(jīng)超出當(dāng)前這個(gè)時(shí)間輪的范圍了,那么就放到上層時(shí)間輪,上層時(shí)間輪的tickMs就是下層時(shí)間輪的interval,也就是20ms

wheelSize是固定的,都是20,那么上層時(shí)間輪的inetrval周期就是400ms,如果再上一層的時(shí)間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時(shí)間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可

反正有很多層級(jí)的時(shí)間輪,一個(gè)時(shí)間輪不夠,就往上開辟一個(gè)新的時(shí)間輪出來,每個(gè)時(shí)間輪的tickMs是下級(jí)時(shí)間輪的interval,而且currentTime就跟時(shí)鐘的指針一樣是不停的轉(zhuǎn)動(dòng)的,你只要根據(jù)定時(shí)周期把他放入對(duì)應(yīng)的輪子即可

每個(gè)輪子插入的時(shí)候根據(jù)currentTime,放到對(duì)應(yīng)時(shí)間之后的時(shí)間格即可

比如定時(shí)350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時(shí)間輪內(nèi),currentTime自然會(huì)轉(zhuǎn)動(dòng)到那個(gè)時(shí)間格來執(zhí)行他

11. Kafka的時(shí)間輪延時(shí)調(diào)度機(jī)制(四):基于DelayQueue推動(dòng)

基于數(shù)組和雙向鏈表來O(1)時(shí)間度可以插入任務(wù)

但是推進(jìn)時(shí)間輪怎么做呢?搞一個(gè)線程不停的空循環(huán)判斷是否進(jìn)入下一個(gè)時(shí)間格嗎?那樣很浪費(fèi)CPU資源,所以采取的是DelayQueue

每個(gè)時(shí)間輪里的TimerTaskList作為這個(gè)時(shí)間格的任務(wù)列表,都會(huì)插入DelayQueue中,設(shè)置一個(gè)延時(shí)出隊(duì)時(shí)間,DelayQueue會(huì)自動(dòng)把過期時(shí)間最短的排在隊(duì)頭,然后專門有一個(gè)線程來從DelayQueue里獲取到期任務(wù)列表

某個(gè)時(shí)間格對(duì)應(yīng)的TimerTaskList到期之后,就會(huì)被線程獲取到,這種方式就可以實(shí)現(xiàn)時(shí)間輪推進(jìn)的效果,推進(jìn)時(shí)間輪基于DelayQueue,時(shí)間復(fù)雜度也是O(1),因?yàn)橹灰獜年?duì)頭獲取即可
在這里插入圖片描述
在這里插入圖片描述

http://aloenet.com.cn/news/29664.html

相關(guān)文章:

  • a3電子報(bào)在什么網(wǎng)站做培訓(xùn)公司
  • 有一個(gè)網(wǎng)站是做釆購(gòu)的是什么網(wǎng)互聯(lián)網(wǎng)推廣怎么找客戶
  • wordpress地址如何修改福州seo顧問
  • 企業(yè)網(wǎng)站建設(shè)是什么網(wǎng)站關(guān)鍵詞排名分析
  • 米拓建設(shè)網(wǎng)站合肥做網(wǎng)絡(luò)推廣的公司
  • 網(wǎng)站開發(fā)需要python 嗎全國(guó)疫情最新消息今天實(shí)時(shí)
  • 電影網(wǎng)站制作模板搜索引擎營(yíng)銷的主要模式有哪些
  • wps wordpress廈門網(wǎng)站seo哪家好
  • 網(wǎng)站備案收費(fèi)么重慶企業(yè)免費(fèi)建站
  • 如何做網(wǎng)站診斷微信營(yíng)銷軟件哪個(gè)好用
  • 大連網(wǎng)站建設(shè)遼icp備如何做網(wǎng)站推廣
  • 網(wǎng)站改版如何做301免費(fèi)發(fā)布信息平臺(tái)有哪些
  • 做個(gè)網(wǎng)站大約多少錢產(chǎn)品推廣網(wǎng)站
  • 北京到安陽的火車票灰色行業(yè)關(guān)鍵詞優(yōu)化
  • 宿松做網(wǎng)站百度指數(shù)在線查詢小程序
  • 深藍(lán)企業(yè)管理咨詢有限公司網(wǎng)站關(guān)鍵字優(yōu)化價(jià)格
  • 廣德做網(wǎng)站網(wǎng)絡(luò)營(yíng)銷推廣及優(yōu)化方案
  • 蘇州響應(yīng)式網(wǎng)站建設(shè)市場(chǎng)營(yíng)銷產(chǎn)品推廣策劃方案
  • bootstrap 風(fēng)格網(wǎng)站百度指數(shù)明星搜索排名
  • 做網(wǎng)站單頁(yè)視頻谷歌關(guān)鍵詞優(yōu)化怎么做
  • 做網(wǎng)站只有域名關(guān)鍵詞搜索量排名
  • 深圳找人做網(wǎng)站aso優(yōu)化師
  • 圖庫(kù)網(wǎng)站源碼下載外貿(mào)網(wǎng)絡(luò)營(yíng)銷平臺(tái)
  • 滁州市大滁城建設(shè)網(wǎng)站章魚磁力鏈接引擎
  • 幫人代做靜態(tài)網(wǎng)站多少錢剛出來的新產(chǎn)品怎么推
  • 電商網(wǎng)站設(shè)計(jì)方案大全建立網(wǎng)站需要多少錢
  • 寧波建設(shè)監(jiān)理管理協(xié)會(huì)網(wǎng)站營(yíng)銷的手段和方法
  • 有edi證書可以做網(wǎng)站運(yùn)營(yíng)么巧克力軟文范例200字
  • 網(wǎng)站描述多個(gè)詞怎么分隔互聯(lián)網(wǎng)推廣銷售是做什么的
  • 傳奇怎么建設(shè)自己的網(wǎng)站怎樣在百度上建立網(wǎng)站