国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

免費(fèi)商城系統(tǒng)下載福建網(wǎng)絡(luò)seo關(guān)鍵詞優(yōu)化教程

免費(fèi)商城系統(tǒng)下載,福建網(wǎng)絡(luò)seo關(guān)鍵詞優(yōu)化教程,手機(jī)網(wǎng)站快速,網(wǎng)頁設(shè)計(jì)建站Flink寫入Kafka兩階段提交 端到端的 exactly-once(精準(zhǔn)一次) kafka -> Flink -> kafka 1)輸入端 輸入數(shù)據(jù)源端的 Kafka 可以對(duì)數(shù)據(jù)進(jìn)行持久化保存,并可以重置偏移量(offset) 2)Flink內(nèi)…

Flink寫入Kafka兩階段提交

端到端的 exactly-once(精準(zhǔn)一次)

kafka -> Flink -> kafka

1)輸入端

輸入數(shù)據(jù)源端的 Kafka 可以對(duì)數(shù)據(jù)進(jìn)行持久化保存,并可以重置偏移量(offset)

2)Flink內(nèi)部

Flink 內(nèi)部可以通過檢查點(diǎn)機(jī)制保證狀態(tài)和處理結(jié)果的 exactly-once 語義

3)輸出端

兩階段提交(2PC)

寫入 Kafka 的過程實(shí)際上是一個(gè)兩段式的提交:處理完畢得到結(jié)果,寫入 Kafka 時(shí)是基于事務(wù)的“預(yù)提交”;等到檢查點(diǎn)保存完畢,才會(huì)提交事務(wù)進(jìn)行“正式提交”

如果中間出現(xiàn)故障,事務(wù)進(jìn)行回滾,預(yù)提交就會(huì)被放棄;恢復(fù)狀態(tài)之后,也只能恢復(fù)所有已經(jīng)確認(rèn)提交的操作。

必須的配置

1)必須啟用檢查點(diǎn)

2)指定 KafkaSink 的發(fā)送級(jí)別為 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 讀取數(shù)據(jù)的消費(fèi)者的隔離級(jí)別【默認(rèn)kafka消費(fèi)者隔離級(jí)別是讀未提交,2PC第一階段預(yù)提交數(shù)據(jù)也會(huì)被讀到,下游消費(fèi)者需要設(shè)置為讀已提交

4)事務(wù)超時(shí)配置

【配置的事務(wù)超時(shí)時(shí)間 transaction.timeout.ms 默認(rèn)是1小時(shí),而Kafka 集群配置的事務(wù)最大超時(shí)時(shí)間 transaction.max.timeout.ms 默認(rèn)是15 分鐘。在檢查點(diǎn)保存時(shí)間很長(zhǎng)時(shí),有可能出現(xiàn) Kafka 已經(jīng)認(rèn)為事務(wù)超時(shí)了,丟棄了預(yù)提交的數(shù)據(jù);而Sink任務(wù)認(rèn)為還可以繼續(xù)等待。如果接下來檢查點(diǎn)保存成功,發(fā)生故障后回滾到這個(gè)檢查點(diǎn)的狀態(tài),這部分?jǐn)?shù)據(jù)就被真正丟掉了。因此checkpoint 間隔 < 事務(wù)超時(shí)時(shí)間 < max的15分鐘

代碼實(shí)戰(zhàn)

kafka -> Flink -> kafka【Flink處理kafka來源數(shù)據(jù)再輸出到kafka】

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、啟用檢查點(diǎn),設(shè)置為精準(zhǔn)一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.讀取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.寫出到 Kafka精準(zhǔn)一次 寫入 Kafka,需要滿足以下條件,【缺一不可】1、開啟 checkpoint2、sink 設(shè)置保證級(jí)別為 精準(zhǔn)一次3、sink 設(shè)置事務(wù)前綴4、sink 設(shè)置事務(wù)超時(shí)時(shí)間: checkpoint 間隔 < 事務(wù)超時(shí)時(shí)間 < max的15分鐘*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名稱、具體的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精準(zhǔn)一次,開啟 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴.setTransactionalIdPrefix("li-")// 【3.3】 設(shè)置事務(wù)超時(shí)時(shí)間.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后續(xù)讀取“ws”這個(gè) topic 的消費(fèi)者,要設(shè)置事務(wù)的隔離級(jí)別為“讀已提交”

public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消費(fèi) 在前面使用【兩階段提交】寫入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作為 下游的消費(fèi)者,要設(shè)置事務(wù)的隔離級(jí)別為 【讀已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

處理程序以及消費(fèi)程序如上設(shè)置才能真正實(shí)現(xiàn)端到端精準(zhǔn)一次的保證。

http://aloenet.com.cn/news/34072.html

相關(guān)文章:

  • 動(dòng)態(tài)網(wǎng)站建設(shè)案例教程下載男生短期培訓(xùn)就業(yè)
  • wordpress哪些文件需要給777成都網(wǎng)站seo技巧
  • php網(wǎng)站數(shù)據(jù)遷移鄭州網(wǎng)站建設(shè)七彩科技
  • 河南如何做網(wǎng)站seo優(yōu)化公司信
  • 青島旅游網(wǎng)站建設(shè)怎么制作一個(gè)網(wǎng)站5個(gè)網(wǎng)頁
  • 天津網(wǎng)絡(luò)關(guān)鍵詞排名石家莊seo網(wǎng)站排名
  • wordpress最新功能網(wǎng)站優(yōu)化技術(shù)
  • 哪個(gè)網(wǎng)站做首飾批發(fā)好百度網(wǎng)頁版登錄入口官網(wǎng)
  • 廣東東莞網(wǎng)站建設(shè)微信管理軟件哪個(gè)最好
  • wordpress標(biāo)簽別名轉(zhuǎn)換網(wǎng)絡(luò)seo招聘
  • 湖南哪里有做網(wǎng)站的愛站工具包手機(jī)版
  • 網(wǎng)站做廣告費(fèi)用關(guān)鍵詞調(diào)價(jià)工具哪個(gè)好
  • 邢臺(tái)網(wǎng)站建設(shè) 冀icp備信息流優(yōu)化師證書
  • 淄博企業(yè)網(wǎng)站建設(shè)自動(dòng)的網(wǎng)站設(shè)計(jì)制作
  • 網(wǎng)站年費(fèi)怎么做分錄十大免費(fèi)貨源網(wǎng)站免費(fèi)版本
  • 建立一個(gè)公司的網(wǎng)站嗎百度競(jìng)價(jià)點(diǎn)擊神器
  • 曲靖做網(wǎng)站價(jià)格超級(jí)seo外鏈
  • 網(wǎng)站開發(fā)課程設(shè)計(jì)參考文獻(xiàn)5118網(wǎng)站如何使用免費(fèi)版
  • 地方網(wǎng)站做的好的百度競(jìng)價(jià)產(chǎn)品
  • 網(wǎng)站建設(shè) 服務(wù)器友情鏈接工具
  • 外貿(mào)公司網(wǎng)站多少錢網(wǎng)頁制作作業(yè)100例
  • 深圳建站公司推薦seo網(wǎng)站建設(shè)是什么意思
  • 查看網(wǎng)站流量跨境電商平臺(tái)推廣
  • 合肥市中小企業(yè)局網(wǎng)站實(shí)時(shí)seo排名點(diǎn)擊軟件
  • 膠州專業(yè)建站湖北seo服務(wù)
  • 做外貿(mào)商城網(wǎng)站鄭州網(wǎng)絡(luò)推廣平臺(tái)
  • 厚街做網(wǎng)站公司代做網(wǎng)頁設(shè)計(jì)平臺(tái)
  • 探測(cè)網(wǎng)站是什么程序做的國(guó)內(nèi)10大搜索引擎
  • 網(wǎng)站數(shù)據(jù)分析指標(biāo)百度seo招聘
  • 網(wǎng)站建設(shè)是哪種發(fā)票中國(guó)十大品牌策劃公司