国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

大型門戶網(wǎng)站建設(shè)需要哪些技術(shù)百度號(hào)碼認(rèn)證平臺(tái)官網(wǎng)首頁

大型門戶網(wǎng)站建設(shè)需要哪些技術(shù),百度號(hào)碼認(rèn)證平臺(tái)官網(wǎng)首頁,超鏈接到網(wǎng)站怎么做,wordpress og背景 當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程 FlinkKafkaConsumer水位線發(fā)送 1.首先從Fetcher類開始&#xff0c…

背景

當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程

FlinkKafkaConsumer水位線發(fā)送

1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時(shí)候會(huì)構(gòu)建一個(gè)周期性的水位線發(fā)送線程并啟動(dòng)

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.隨后,PeriodicWatermarkEmitter中注冊處理時(shí)間定時(shí)器,周期性執(zhí)行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 這里當(dāng)前算子任務(wù)消費(fèi)的kafka 分區(qū)分別記錄每個(gè)分區(qū)的水位值state.onPeriodicEmit();}//這里當(dāng)前算子會(huì)把自己消費(fèi)的kafka分區(qū)的所有水位線取最小值后當(dāng)成當(dāng)前算子任務(wù)自身的水位線發(fā)送出去,注意這里是當(dāng)前算子任務(wù)級(jí)別的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.對應(yīng)state.onPeriodicEmit();記錄每個(gè)kafka分區(qū)的水位線方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代碼如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以這里最終效果只是對應(yīng)state(kafka分區(qū)[注意,一個(gè)算子任務(wù)有可能消費(fèi)好幾個(gè)kafka分區(qū)])上設(shè)置了水位線/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 這里也可以看出來,即使代碼里面發(fā)送了更小值的水位線,水位線也不會(huì)回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.對應(yīng)算子任務(wù)組合當(dāng)前任務(wù)消費(fèi)的所有分區(qū)水位線的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任務(wù)不消費(fèi)任何分區(qū),它不會(huì)發(fā)出任何水位線,這里是不是就是kafka消費(fèi)者要小于kafka主題的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果當(dāng)前算子任務(wù)處于空閑時(shí)間,標(biāo)識(shí)空閑,以便后續(xù)算子可以繼續(xù)推進(jìn)underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```
http://aloenet.com.cn/news/33351.html

相關(guān)文章:

  • 西安做網(wǎng)站需要多少錢京東seo搜索優(yōu)化
  • 站長網(wǎng)seo綜合查詢工具百度托管公司
  • 做網(wǎng)站優(yōu)化有用嗎百度廣告公司聯(lián)系方式
  • 電腦手機(jī)網(wǎng)站制作網(wǎng)站免費(fèi)優(yōu)化
  • ps做網(wǎng)站的流程2023年8月份新冠
  • 專業(yè)網(wǎng)絡(luò)推廣公司排名北京推廣優(yōu)化經(jīng)理
  • 成都網(wǎng)站注冊域名注冊后如何建網(wǎng)站
  • 點(diǎn)擊未來網(wǎng)站建設(shè)游戲代理
  • 新手做網(wǎng)站的詳細(xì)步驟網(wǎng)站友鏈
  • 北京做網(wǎng)站定制價(jià)格seo診斷服務(wù)
  • 1688運(yùn)營自學(xué)全套教程seo網(wǎng)站推廣工具
  • 蘇州吳中區(qū)做網(wǎng)站新東方教育培訓(xùn)機(jī)構(gòu)官網(wǎng)
  • wordpress恢復(fù)分類目錄seo營銷論文
  • 自動(dòng)化東莞網(wǎng)站建設(shè)北京疫情最新消息
  • 網(wǎng)站開發(fā)視頻壓縮上傳seo資源
  • 旅游網(wǎng)站在提高用戶體驗(yàn)方面應(yīng)做哪些工作長春網(wǎng)站建設(shè)制作
  • 做吃的教程網(wǎng)站品牌整合營銷方案
  • 典型網(wǎng)站開發(fā)的一般流程推廣app是什么工作
  • 好看網(wǎng)站手機(jī)版批量查詢權(quán)重
  • 網(wǎng)站建設(shè)php帶數(shù)據(jù)庫模板網(wǎng)絡(luò)安全
  • 如何做網(wǎng)站鏡像百度鏈接提交入口
  • 手機(jī)網(wǎng)站有哪些類型成都網(wǎng)絡(luò)推廣
  • 網(wǎng)站建設(shè) 開發(fā)網(wǎng)站代碼百度網(wǎng)盤官網(wǎng)
  • 怎么用家里的電腦做網(wǎng)站服務(wù)器上海seo公司排名
  • 學(xué)校網(wǎng)站建設(shè)的優(yōu)勢和不足成人用品推廣網(wǎng)頁
  • 陽春網(wǎng)站制作寧波網(wǎng)站建設(shè)推廣平臺(tái)
  • 個(gè)人網(wǎng)站 備案 廣告seo搜索引擎優(yōu)化價(jià)格
  • 72搭建網(wǎng)站網(wǎng)頁代引流推廣公司
  • 想開個(gè)網(wǎng)站賣衣服的怎么做常州seo收費(fèi)
  • 有交做拼多多網(wǎng)站的嗎產(chǎn)品推廣平臺(tái)有哪些