在線制作logo設(shè)計百度seo排名優(yōu)化系統(tǒng)
源算子Data Source
- 概述
- 內(nèi)置Data Source
- 基于集合構(gòu)建
- 基于文件構(gòu)建
- 基于Socket構(gòu)建
- 自定義Data Source
- SourceFunction
- RichSourceFunction
- 常見連接器
- 第三方系統(tǒng)連接器
- File Source連接器
- DataGen Source連接器
- Kafka Source連接器
- RabbitMQ Source連接器
- MongoDB Source連接器
概述
Flink中的Data Source(數(shù)據(jù)源、源算子)用于定義數(shù)據(jù)輸入的來源。數(shù)據(jù)源是Flink作業(yè)的起點,它可以從各種數(shù)據(jù)來源獲取數(shù)據(jù),例如文件系統(tǒng)、消息隊列、數(shù)據(jù)庫等。
將數(shù)據(jù)源添加到Flink執(zhí)行環(huán)境中,從而創(chuàng)建一個數(shù)據(jù)流。然后可以對該數(shù)據(jù)流應(yīng)用一系列轉(zhuǎn)換和操作,例如過濾、轉(zhuǎn)換、聚合、計算等。最后將結(jié)果寫入其他系統(tǒng),例如文件系統(tǒng)、數(shù)據(jù)庫、消息隊列等。
數(shù)據(jù)源是Flink作業(yè)中非常重要的組件,它確定了數(shù)據(jù)的來源和初始輸入,是構(gòu)建流處理和批處理作業(yè)的基礎(chǔ)。
內(nèi)置Data Source
Flink Data Source用于定義Flink程序的數(shù)據(jù)來源,Flink官方提供了多種數(shù)據(jù)獲取方法,用于幫助開發(fā)者簡單快速地構(gòu)建輸入流
基于集合構(gòu)建
可以將數(shù)據(jù)臨時存儲到內(nèi)存中,形成特殊的數(shù)據(jù)結(jié)構(gòu)后,作為數(shù)據(jù)源使用,比如采用集合類型。一般用來進行本地調(diào)試或者驗證。
fromCollection(Collection):基于集合構(gòu)建,集合中的所有元素必須是同一類型fromElements(T ...): 基于元素構(gòu)建,所有元素必須是同一類型generateSequence(from, to):基于給定的序列區(qū)間進行構(gòu)建fromCollection(Iterator, Class):基于迭代器進行構(gòu)建。第一個參數(shù)用于定義迭代器,第二個參數(shù)用于定義輸出元素的類型
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 基于元素構(gòu)建DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4);// 基于集合構(gòu)建DataStreamSource<Integer> source2 = env.fromCollection(Arrays.asList(1, 2, 3, 4));// 基于給定的序列區(qū)間進行構(gòu)建env.generateSequence(0,100);// 基于迭代器進行構(gòu)建env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();source1.print();env.execute();}
自定義的迭代器CustomIterator,產(chǎn)生 1 到 100 區(qū)間內(nèi)的數(shù)據(jù)
注意: 自定義迭代器要實現(xiàn)Iterator接口外,還必須要實現(xiàn)序列化接口Serializable ,否則會拋出序列化失敗的異常
import java.io.Serializable;
import java.util.Iterator;public class CustomIterator implements Iterator<Integer>, Serializable {private Integer i = 0;@Overridepublic boolean hasNext() {return i < 100;}@Overridepublic Integer next() {i++;return i;}
}
基于文件構(gòu)建
在本地環(huán)境進行測試時可以方便地從本地文件讀取數(shù)據(jù)
readTextFile(path):按照TextInputFormat 格式讀取文本文件,并將其內(nèi)容以字符串的形式返回。示例如下:readFile(fileInputFormat, path) :按照指定格式讀取文件。readFile(inputFormat, filePath, watchType, interval, typeInformation):按照指定格式周期性的讀取文件。
各個參數(shù)含義:
inputFormat:數(shù)據(jù)流的輸入格式filePath:文件路徑,可以是本地文件系統(tǒng)上的路徑,也可以是HDFS上的文件路徑watchType:讀取方式,兩個可選值: 1.FileProcessingMode.PROCESS_ONCE: 表示對指定路徑上的數(shù)據(jù)只讀取一次,然后退出2.FileProcessingMode.PROCESS_CONTINUOUSLY: 表示對路徑進行定期地掃描和讀取。注意:當文件被修改時,其所有的內(nèi)容 (包含原有的內(nèi)容和新增的內(nèi)容) 都將被重新處理,因此這會打破Flink的exactly-once語義interval:定期掃描的時間間隔typeInformation:輸入流中元素的類型
public static void main(String[] args) throws Exception {String filePath = "data/test.text";// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env// 讀取文本文件,并將其內(nèi)容以字符串的形式返回.readTextFile(filePath).print();env.readFile(new TextInputFormat(new Path(filePath)), filePath, FileProcessingMode.PROCESS_ONCE, 1, BasicTypeInfo.STRING_TYPE_INFO).print();env.execute();}
基于Socket構(gòu)建
通過監(jiān)聽Socket端口,可以在本地很方便地模擬一個實時計算環(huán)境。
Flink提供了socketTextStream方法可以通過host和port從一個Socket中以文本的方式讀取數(shù)據(jù),以此構(gòu)建基于Socket的數(shù)據(jù)流
socketTextStream方法有以下四個主要參數(shù):
hostname:主機名port:端口號,設(shè)置為 0 時,表示端口號自動分配delimiter:用于分隔每條記錄的分隔符maxRetry:當Socket臨時關(guān)閉時,程序的最大重試間隔,單位為秒。設(shè)置為0時表示不進行重試;設(shè)置為負值則表示一直重試
示例如下:
env.socketTextStream("IP", 8888, "\n", 3).print();
讀取socket文本流,是流處理場景,這種方式由于吞吐量小、穩(wěn)定性較差,一般也是用于測試
// 創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);// 讀取socket文本流
DataStreamSource<String> socketDS = env.socketTextStream("IP", 8888);
注意:基于Socket構(gòu)建數(shù)據(jù)源,一般需要搭配Netcat使用。
Netcat(又稱為NC)是一個計算機網(wǎng)絡(luò)工具,它可以在兩臺計算機之間建立 TCP/IP 或 UDP 連接。它被廣泛用于測試網(wǎng)絡(luò)中的端口,發(fā)送文件等操作。使用 Netcat 可以輕松地進行網(wǎng)絡(luò)調(diào)試和探測,也可以進行加密連接和遠程管理等高級網(wǎng)絡(luò)操作。因為其功能強大而又簡單易用,所以在計算機安全領(lǐng)域也有著廣泛的應(yīng)用。
安裝nc命令
yum install -y nc
啟動socket端口
[root@master ~]# nc -l 8080
abc bcd cde
bcd cde fgh
cde fgh hij
注意:
測試時,先啟動端口,后啟動程序,會報超時連接異常,最后發(fā)送測試數(shù)據(jù)即可。
自定義Data Source
可以通過實現(xiàn)Flink的SourceFunction、ParallelSourceFunction、RichSourceFunction、RichParallelSourceFunction等類并重寫其方法以此實現(xiàn)自定義Data Source
ParallelSourceFunction、RichParallelSourceFunction分別與SourceFunction、RichSourceFunctio功能類似,只不過它們通過SourceContext發(fā)送的數(shù)據(jù)會自動分發(fā)到并行任務(wù)中去,也就是說具有并行度的功能。
SourceFunction
它是Flink 提供的基礎(chǔ)接口之一,用于定義數(shù)據(jù)源的行為。它包含一個 run 方法,該方法用于啟動數(shù)據(jù)源,并使用SourceContext來發(fā)送數(shù)據(jù)元素。它中的方法是生命周期很簡單的基礎(chǔ)方法。
操作步驟:
實現(xiàn)SourceFunction接口:創(chuàng)建一個實現(xiàn)SourceFunction接口的類,該接口定義讀取數(shù)據(jù)并發(fā)出數(shù)據(jù)流的方法。這個接口中的核心方法是run()和cancel(),其中run()方法用于讀取數(shù)據(jù)并發(fā)出一系列事件,cancel()方法用于取消數(shù)據(jù)源的運行實現(xiàn)run()方法:可以定義從數(shù)據(jù)源讀取數(shù)據(jù)的邏輯。這可以是從文件、數(shù)據(jù)庫、消息隊列等讀取數(shù)據(jù)的邏輯。在適當?shù)臅r候,使用collect()方法將讀取的數(shù)據(jù)發(fā)出到數(shù)據(jù)流中實現(xiàn)cancel()方法:可以編寫停止或清理數(shù)據(jù)源的邏輯。例如,如果數(shù)據(jù)源使用了外部資源,在這里釋放這些資源注冊數(shù)據(jù)源:將數(shù)據(jù)源注冊到Flink的執(zhí)行環(huán)境中,以便可以在作業(yè)中使用。通過執(zhí)行環(huán)境的addSource()方法,向執(zhí)行環(huán)境添加數(shù)據(jù)源
public class MySource implements SourceFunction<String> {private boolean isRunning = true;/*** run() 方法是核心方法,它會不斷地讀取、產(chǎn)生數(shù)據(jù)并將數(shù)據(jù)發(fā)送到下游* */@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {// 產(chǎn)生一些數(shù)據(jù)String data = UUID.randomUUID().toString();// 將數(shù)據(jù)發(fā)送到下游ctx.collect(data);// 每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** cancel() 方法用于在取消任務(wù)時清理資源*/@Overridepublic void cancel() {isRunning = false;}
}
將自定義的數(shù)據(jù)源傳遞給 env.addSource() 方法,并通過 .print() 將數(shù)據(jù)打印到控制臺中。最后調(diào)用 env.execute() 方法來啟動Flink程序。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 將自定義的數(shù)據(jù)源添加到 Flink 程序中DataStreamSource<String> streamSource = env.addSource(new MySource());streamSource.print();env.execute("MyApp");}
RichSourceFunction
如果需要更高級的功能和更豐富的生命周期控制,可以使用RichSourceFunction 類。RichSourceFunction是
SourceFunction接口的子類,它提供了額外的方法和功能,例如初始化、配置和資源管理。
操作步驟:
擴展RichSourceFunction 類:創(chuàng)建一個類,擴展 RichSourceFunction<T> 類,并將 T 替換為要發(fā)出的數(shù)據(jù)類型實現(xiàn)open() 方法:進行初始化操作,例如建立與外部系統(tǒng)的連接或加載資源等。這個方法是在數(shù)據(jù)源的生命周期開始時被調(diào)用的實現(xiàn)run() 方法:實現(xiàn)讀取數(shù)據(jù)并發(fā)出數(shù)據(jù)流的邏輯。這個方法在啟動數(shù)據(jù)源時會被調(diào)用實現(xiàn)cancel() 方法:添加取消數(shù)據(jù)源的邏輯。這個方法將在停止數(shù)據(jù)源時調(diào)用實現(xiàn)close() 方法:進行一些資源清理操作。這個方法是在數(shù)據(jù)源生命周期結(jié)束時調(diào)用的
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class CustomRichDataSource extends RichSourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化操作,例如建立與外部系統(tǒng)的連接或加載資源等}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {// 讀取數(shù)據(jù)的邏輯// 發(fā)出數(shù)據(jù)到數(shù)據(jù)流ctx.collect("Hello, World!");// 控制發(fā)送數(shù)據(jù)的速度Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void close() throws Exception {// 資源清理操作}
}
常見連接器
第三方系統(tǒng)連接器
Flink內(nèi)置了多種連接器,用于滿足大多數(shù)的數(shù)據(jù)收集場景。連接器可以和多種多樣的第三方系統(tǒng)進行交互。
Flink官方目前支持以下第三方系統(tǒng)連接器
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)
除Flink官方之外,還有一些其他第三方系統(tǒng)與Flink的連接器,通過Apache Bahir發(fā)布:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)
File Source連接器
從文件讀取數(shù)據(jù)是一種常見方式,比如讀取日志文件,這是批處理中最常見的讀取方式
flink-connector-files
是Apache Flink的一個連接器
,用于將本地文件系統(tǒng)或遠程文件系統(tǒng)中的文件作為數(shù)據(jù)源或數(shù)據(jù)接收器使用。
它提供了一種簡單的方法來處理文本文件或其他格式的文件,例如CSV、JSON、Avro等,并將其轉(zhuǎn)換為Flink數(shù)據(jù)流。在使用時,可以指定文件的路徑、編碼方式和分隔符等參數(shù),并使用適當?shù)霓D(zhuǎn)換函數(shù)將文件內(nèi)容解析為Flink的數(shù)據(jù)類型,然后進行數(shù)據(jù)處理和分析。
它支持對輸出流的寫入操作,將Flink數(shù)據(jù)流中的結(jié)果寫入到指定的文件中。可以通過配置文件路徑、編碼方式和文件格式等參數(shù)來控制輸出文件的格式和內(nèi)容
添加文件連接器依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.17.0</version><scope>provided</scope></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 從文件流中逐條記錄讀取** 文件路徑參數(shù)可以是目錄,具體文件、以及從HDFS目錄下讀取* 路徑可以是相對路徑,也可以是絕對路徑;* 相對路徑是從系統(tǒng)屬性`user.dir`獲取路徑:idea下是`project的根目錄`,standalone模式下是`集群節(jié)點根目錄`*/FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/word.txt")).build();/*** source ——用戶定義的來源* sourceName – 數(shù)據(jù)源的名稱*/env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "file-source").print();env.execute();}
DataGen Source連接器
Flink提供了一個內(nèi)置的DataGen連接器,主要用于生成一些隨機數(shù),進行流任務(wù)的測試以及性能測試
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>1.17.0</version>
</dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 如果有n個并行度, 最大值設(shè)為a// 將數(shù)值 均分成 n份, a/n ,比如,最大100,并行度2,每個并行度生成50個// 其中一個是 0-49,另一個50-99/*** DataGeneratorSource中,單個并行度生成數(shù)據(jù)個數(shù)與 與 生成的數(shù)據(jù)個數(shù) 相關(guān)* 公式: 生成的數(shù)據(jù)個數(shù) / 并行度 = 每個并行度生成個數(shù)* 例子: 并行度設(shè)置為2,生成數(shù)據(jù)個數(shù)100,則每個并行度生成個數(shù)=100/2. 一個并行度:0-49 另一個并行度:50-99*/env.setParallelism(2);/*** 數(shù)據(jù)生成器Source* GeneratorFunction<Long, OUT> generatorFunction : GeneratorFunction接口函數(shù)需要實現(xiàn), 重寫map方法, 輸入類型固定是Long* long count : 生成的數(shù)據(jù)個數(shù)。自動生成的數(shù)字序列,從0自增。當數(shù)字數(shù)序列最大值達到或小于這個值就停止* RateLimiterStrategy rateLimiterStrategy :限速策略,如每秒生成幾條數(shù)據(jù)* TypeInformation<OUT> typeInfo : 返回的數(shù)據(jù)類型**/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator").print();env.execute();}
1> Number:0
2> Number:50
2> Number:51
1> Number:1
2> Number:52
1> Number:2
2> Number:53
Kafka Source連接器
Flink-connector-kafka就是Flink的一個連接器,它提供了一個簡單的方法來將Kafka作為Flink應(yīng)用程序的數(shù)據(jù)源或數(shù)據(jù)接收器使用。
flink-connector-kafka可以幫助Flink應(yīng)用程序從Kafka主題中讀取數(shù)據(jù),也可以將Flink的數(shù)據(jù)流寫入到Kafka主題中。在使用時,可以指定Kafka集群的地址、主題名稱、消費者組名稱等參數(shù),并使用適當?shù)男蛄谢头葱蛄谢ぞ邔?shù)據(jù)轉(zhuǎn)換為Flink的數(shù)據(jù)類型。
Topic、Partition訂閱
Kafka Source提供了3 種Topic、Partition的訂閱方式:
1.Topic 列表,訂閱 Topic 列表中所有Partition的消息:
KafkaSource.builder().setTopics("topic-a", "topic-b");
2.正則表達式匹配,訂閱與正則表達式所匹配的Topic下的所有Partition:
KafkaSource.builder().setTopicPattern("topic.*");
3.Partition列表,訂閱指定的 Partition:
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(new TopicPartition("topic-a", 0), // 主題為 "topic-a"的0號分區(qū)new TopicPartition("topic-b", 5))); // 主題為 "topic-b"的5號分區(qū)
KafkaSource.builder().setPartitions(partitionSet);
起始消費位點
Kafka source 能夠通過位點初始化器(OffsetsInitializer)來指定從不同的偏移量開始消費 。
如果內(nèi)置的初始化器不能滿足需求,也可以實現(xiàn)自定義的位點初始化器OffsetsInitializer
如果未指定位點初始化器,將默認使用 OffsetsInitializer.earliest()
內(nèi)置的位點初始化器包括:
KafkaSource.builder()// 從消費組提交的位點開始消費,不指定位點重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 從時間戳大于等于指定時間戳(毫秒)的數(shù)據(jù)開始消費.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 從最早位點開始消費.setStartingOffsets(OffsetsInitializer.earliest())// 從最末尾位點開始消費.setStartingOffsets(OffsetsInitializer.latest());
動態(tài)分區(qū)檢查
為了在不重啟Flink作業(yè)的情況下處理Topic擴容或新建Topic等場景,可以將KafkaSource配置為在提供的Topic/Partition訂閱模式下定期檢查新分區(qū)。分區(qū)檢查功能默認不開啟。
KafkaSource.builder().setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒檢查一次新分區(qū)
事件時間和水印
默認情況下,Kafka Source使用Kafka消息中的時間戳作為事件時間??梢远x自己的水印策略(Watermark Strategy) 以從消息中提取事件時間,并向下游發(fā)送水印:
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
示例
引入Kafka連接器依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSource<String> kafkaSource = KafkaSource.<String>builder()// 指定kafka節(jié)點的地址和端口.setBootstrapServers("node01:9092,node02:9092,node03:9092")// 指定消費者組的id.setGroupId("flink_group")// 指定消費的 Topic.setTopics("flink_topic")// 指定反序列化器,反序列化value.setValueOnlyDeserializer(new SimpleStringSchema())// flink消費kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();// 不使用 watermark 的策略,意味著數(shù)據(jù)流不會根據(jù)事件時間進行處理DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");stream.print("Kafka");// 定義事件時間watermark策略,處理數(shù)據(jù)流中的無序事件,并設(shè)置最大延遲時間為3秒。DataStreamSink<String> kafka_source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source").print("Kafka");env.execute();}
RabbitMQ Source連接器
添加對RabbitMQ連接器的依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
1.服務(wù)質(zhì)量 (QoS)
服務(wù)質(zhì)量是一種用于控制數(shù)據(jù)源連接器如何消費消息的策略。在Flink中,服務(wù)質(zhì)量定義了消費者和消息代理之間的消息傳輸保證級別。通過合適的服務(wù)質(zhì)量設(shè)置,可以實現(xiàn)以下不同的語義保證:
Exactly-once:確保消息僅被正確處理一次At-least-once:確保消息至少被正確處理一次None(最多一次):不提供消息處理保證,可能會出現(xiàn)重復處理或丟失消息的情況
1.精確一次:
保證精確一次需要以下條件
開啟checkpointing: 開啟之后,消息在checkpoints完成之后才會被確認,然后從RabbitMQ隊列中刪除使用關(guān)聯(lián)標識Correlationids: 關(guān)聯(lián)標識是RabbitMQ的一個特性,消息寫入RabbitMQ時在消息屬性中設(shè)置。從checkpoint恢復時有些消息可能會被重復處理,source可以利用關(guān)聯(lián)標識對消息進行去重。非并發(fā)source: 為了保證精確一次的數(shù)據(jù)投遞,source必須是非并發(fā)的(并行度設(shè)置為1)。這主要是由于RabbitMQ分發(fā)數(shù)據(jù)時是從單隊列向多個消費者投遞消息的。
2.至少一次:
在checkpointing開啟的條件下,如果沒有使用關(guān)聯(lián)標識或者source是并發(fā)的,那么source就只能提供至少一次的保證。
3.無任何保證:
如果沒有開啟checkpointing,source就不能提供任何的數(shù)據(jù)投遞保證。使用這種設(shè)置時,source一旦接收到并處理消息,消息就會被自動確認。
2.消費者預(yù)取Consumer Prefetch
注意:
默認情況下是不設(shè)置prefetch count的,這意味著RabbitMQ服務(wù)器將會無限制地向source發(fā)送消息。因此在生產(chǎn)環(huán)境中,最好要設(shè)置它。
prefetch count是對單個channel設(shè)置的,并且由于每個并發(fā)的source都持有一個connection/channel,因此這個值實際上會乘以 source 的并行度,來表示同一時間可以向這個job總共發(fā)送多少條未確認的消息。
使用
setPrefetchCount()
方法用于設(shè)置消費者預(yù)取值,這里將其設(shè)置為 10。這意味著每個消費者在處理完 10 條消息之前不會從 RabbitMQ 隊列中獲取更多的消息。
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setPrefetchCount(10) //設(shè)置消費者預(yù)取值為 10....build();
以下是保證exactly-once的RabbitMQ source示例
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 啟用檢查點(checkpointing)以實現(xiàn)精確一次或至少一次的一致性保證env.enableCheckpointing(5000); // 每 5000 毫秒執(zhí)行一次檢查點final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost") // RabbitMQ 主機名.setPort(5672) // RabbitMQ 端口號.setUserName("guest") // RabbitMQ 用戶名.setPassword("guest") // RabbitMQ 密碼.setVirtualHost("/") // RabbitMQ 虛擬主機.setPrefetchCount(10) // 設(shè)置消費者預(yù)取值為 10.build();final DataStream<String> stream = env.addSource(new RMQSource<String>(connectionConfig, // RabbitMQ 連接配置"queueName", // 需要消費的 RabbitMQ 隊列名true, // 是否使用關(guān)聯(lián) ID;如果僅需要至少一次的保證,可以設(shè)置為 falsenew SimpleStringSchema())) // 反序列化方案,將消息轉(zhuǎn)換為 Java 對象.setParallelism(1); // 非并行的源,僅在需要精確一次性保證時才需要設(shè)置stream.print();env.execute("RabbitMQ Source Example");}
MongoDB Source連接器
Flink 提供了MongoDB 連接器使用至少一次(At-least-once)的語義在 MongoDB collection中讀取和寫入數(shù)據(jù)。
要使用此連接器,先添加依賴到項目中:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.0.1-1.17</version>
</dependency>
public static void main(String[] args) throws Exception {MongoSource<String> source = MongoSource.<String>builder()// MongoDB 連接 URI.setUri("mongodb://user:password@127.0.0.1:27017")// 數(shù)據(jù)庫名.setDatabase("my_db")// 集合名.setCollection("my_coll")// 投影的字段.setProjectedFields("_id", "f0", "f1")// 默認值: 2048 設(shè)置每次循環(huán)讀取時應(yīng)該從游標中獲取的行數(shù).setFetchSize(2048)// 默認值:-1 限制每個reader最多讀取文檔的數(shù)量。如果設(shè)置了讀取并行度大于1,那么最多讀取的文檔數(shù)量等于 并行度 * 限制數(shù)量。.setLimit(10000)// 默認值: true 不使用游標超時 防止cursor因為讀取時間過長或者背壓導致的空閑而關(guān)閉.setNoCursorTimeout(true)/*** 使用分區(qū)可以利用并行讀取來加速整體的讀取效率。** 設(shè)置分區(qū)策略,可選的分區(qū)策略有 SINGLE,SAMPLE,SPLIT_VECTOR,SHARDED 和 DEFAULT** SINGLE:將整個集合作為一個分區(qū)。* SAMPLE:通過隨機采樣的方式來生成分區(qū),快速但可能不均勻。* SPLIT_VECTOR:通過 MongoDB 計算分片的 splitVector 命令來生成分區(qū),快速且均勻。 僅適用于未分片集合,需要 splitVector 權(quán)限。* SHARDED:從 config.chunks 集合中直接讀取分片集合的分片邊界作為分區(qū),不需要額外計算,快速且均勻。 僅適用于已經(jīng)分片的集合,需要 config 數(shù)據(jù)庫的讀取權(quán)限。* DEFAULT:對分片集合使用 SHARDED 策略,對未分片集合使用 SPLIT_VECTOR 策略。*/.setPartitionStrategy(PartitionStrategy.SAMPLE) // 設(shè)置每個分區(qū)的內(nèi)存大小,默認值:64mb 通過指定的分區(qū)大小,將 MongoDB 的一個集合切分成多個分區(qū)。 可以設(shè)置并行度,并行地讀取這些分區(qū),以提升整體的讀取速度。.setPartitionSize(MemorySize.ofMebiBytes(64))// 默認值:10 僅用于 SAMPLE 抽樣分區(qū)策略,設(shè)置每個分區(qū)的樣本數(shù)量。抽樣分區(qū)器根據(jù)分區(qū)鍵對集合進行隨機采樣的方式計算分區(qū)邊界。 總的樣本數(shù)量 = 每個分區(qū)的樣本數(shù)量 * (文檔總數(shù) / 每個分區(qū)的文檔數(shù)量).setSamplesPerPartition(10)// 設(shè)置 MongoDeserializationSchema 用于解析 MongoDB BSON 類型的文檔.setDeserializationSchema(new MongoDeserializationSchema<String>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}) // 自定義的反序列化方案.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source").setParallelism(2) // 設(shè)置并行度為 2.print().setParallelism(1); // 設(shè)置并行度為 1env.execute("MongoDB Source Example");}