常州做網(wǎng)站的公司有哪些今天實(shí)時熱搜榜排名
大綱
- 新建工程
- 自定義無界流
- 使用
- 打包、提交、運(yùn)行
- 工程代碼
在《Java版Flink使用指南——從RabbitMQ中隊(duì)列中接入消息流》一文中,我們讓外部組件RabbitMQ充當(dāng)了無界流的數(shù)據(jù)源,使得Flink進(jìn)行了流式處理。在《Java版Flink使用指南——將消息寫入到RabbitMQ的隊(duì)列中》一文中,我們使用了Flink自帶的數(shù)據(jù)生成器,生成了有限數(shù)據(jù),從而讓Flink以批處理形式運(yùn)行了該任務(wù)。
本文我們將自定義一個無界流生成器,以方便后續(xù)測試。
新建工程
我們新建一個名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
自定義無界流
新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator實(shí)現(xiàn)RichSourceFunction接口
public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}
主要實(shí)現(xiàn)SourceFunction接口的run和cancel方法。run方法用來獲取獲取,cancel方法用于終止任務(wù)。
package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}
在run方法中,我們每隔一秒產(chǎn)生一條數(shù)據(jù),且這個數(shù)字自增。
使用
我們使用addSource方法,將該無界流生成器添加成數(shù)據(jù)源。然后將其輸出到日志。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}
打包、提交、運(yùn)行
使用下面命令查看日志輸出
tail -f log/*
然后我們在后臺點(diǎn)擊Cancel Job
可以看到輸出
工程代碼
https://github.com/f304646673/FlinkDemo