免費(fèi)商城系統(tǒng)下載福建網(wǎng)絡(luò)seo關(guān)鍵詞優(yōu)化教程
Flink寫入Kafka兩階段提交
端到端的 exactly-once(精準(zhǔn)一次)
kafka -> Flink -> kafka
1)輸入端
輸入數(shù)據(jù)源端的 Kafka 可以對(duì)數(shù)據(jù)進(jìn)行持久化保存,并可以重置偏移量(offset)
2)Flink內(nèi)部
Flink 內(nèi)部可以通過檢查點(diǎn)機(jī)制保證狀態(tài)和處理結(jié)果的 exactly-once 語義
3)輸出端
兩階段提交(2PC)
。
寫入 Kafka 的過程實(shí)際上是一個(gè)兩段式的提交:處理完畢得到結(jié)果,寫入 Kafka 時(shí)是基于事務(wù)的“預(yù)提交”
;等到檢查點(diǎn)保存完畢,才會(huì)提交事務(wù)進(jìn)行“正式提交”
。
如果中間出現(xiàn)故障,事務(wù)進(jìn)行回滾,預(yù)提交就會(huì)被放棄;恢復(fù)狀態(tài)之后,也只能恢復(fù)所有已經(jīng)確認(rèn)提交的操作。
必須的配置
1)必須啟用檢查點(diǎn)
2)指定 KafkaSink 的發(fā)送級(jí)別為 DeliveryGuarantee.EXACTLY_ONCE
3)配置 Kafka 讀取數(shù)據(jù)的消費(fèi)者的隔離級(jí)別
【默認(rèn)kafka消費(fèi)者隔離級(jí)別是讀未提交
,2PC第一階段預(yù)提交數(shù)據(jù)也會(huì)被讀到,下游消費(fèi)者需要設(shè)置為讀已提交
】
4)事務(wù)超時(shí)配置
【配置的事務(wù)超時(shí)時(shí)間 transaction.timeout.ms 默認(rèn)是1小時(shí),而Kafka 集群配置的事務(wù)最大超時(shí)時(shí)間 transaction.max.timeout.ms 默認(rèn)是15 分鐘。在檢查點(diǎn)保存時(shí)間很長(zhǎng)時(shí),有可能出現(xiàn) Kafka 已經(jīng)認(rèn)為事務(wù)超時(shí)了,丟棄了預(yù)提交的數(shù)據(jù);而Sink任務(wù)認(rèn)為還可以繼續(xù)等待。如果接下來檢查點(diǎn)保存成功,發(fā)生故障后回滾到這個(gè)檢查點(diǎn)的狀態(tài),這部分?jǐn)?shù)據(jù)就被真正丟掉了。因此checkpoint 間隔 < 事務(wù)超時(shí)時(shí)間 < max的15分鐘
】
代碼實(shí)戰(zhàn)
kafka -> Flink -> kafka【Flink處理kafka來源數(shù)據(jù)再輸出到kafka】
public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 【1】、啟用檢查點(diǎn),設(shè)置為精準(zhǔn)一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.讀取 kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*3.寫出到 Kafka精準(zhǔn)一次 寫入 Kafka,需要滿足以下條件,【缺一不可】1、開啟 checkpoint2、sink 設(shè)置保證級(jí)別為 精準(zhǔn)一次3、sink 設(shè)置事務(wù)前綴4、sink 設(shè)置事務(wù)超時(shí)時(shí)間: checkpoint 間隔 < 事務(wù)超時(shí)時(shí)間 < max的15分鐘*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092")// 指定序列化器:指定 Topic 名稱、具體的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 【3.1】 精準(zhǔn)一次,開啟 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 【3.2】 精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴.setTransactionalIdPrefix("li-")// 【3.3】 設(shè)置事務(wù)超時(shí)時(shí)間.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}
后續(xù)讀取“ws”這個(gè) topic 的消費(fèi)者,要設(shè)置事務(wù)的隔離級(jí)別為“讀已提交”
public class KafkaEOSConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消費(fèi) 在前面使用【兩階段提交】寫入的 TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setGroupId("default").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// 作為 下游的消費(fèi)者,要設(shè)置事務(wù)的隔離級(jí)別為 【讀已提交】.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}
處理程序以及消費(fèi)程序如上設(shè)置才能真正實(shí)現(xiàn)端到端精準(zhǔn)一次的保證。