坪山住房和建設局網(wǎng)站推銷
Flink在早期版本有一個split
算子用來做數(shù)據(jù)分流
使用的,但是在flink-1.12
開始這個API
就已經(jīng)被刪除了,在1.12
版本以后我們是通過process
算子來做數(shù)據(jù)分流的,這里就介紹一下如何使用prodess
進行數(shù)據(jù)分流.
- 代碼
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 測流輸出**/
public class FlinkSideOutput {public static void main(String[] args) throws Exception {// 構(gòu)建流環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置并行度env.setParallelism(3);// 這里使用的是自定義數(shù)據(jù)源為了方便測試,具體數(shù)據(jù)源根據(jù)自己的實際情況進行更換DataStreamSource<CustomizeBean> customizeSourceStream = env.addSource(new CustomizeSource());/*** 需求* 1. 將性別為M且愛好為'羽毛球運動愛好者'分到一個流* 2. 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'分到一個流* 3. 其他保留到主流**/SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性別String hobbit = value.getHobbit(); // 愛好if (gender.equals("M") && hobbit.equals("羽毛球運動愛好者")) {// 將性別為M且愛好為'羽毛球運動愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if (gender.equals("W") && (hobbit.equals("籃球運動愛好者") || hobbit.equals("釣魚愛好者"))) {// 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)), value);} else {// 將剩下的數(shù)據(jù)保留在主流中out.collect(value);}}});// 獲取'M-羽毛球'分流數(shù)據(jù),這里也要加上類型聲明DataStream<CustomizeBean> mSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)));// 打印'M-羽毛球'結(jié)果mSideOutput.print("M-羽毛球");// 獲取'W-籃球/釣魚'分流數(shù)據(jù),這里也要加上類型聲明DataStream<CustomizeBean> wSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)));// 打印結(jié)果wSideOutput.print("W-籃球/釣魚");// 主流數(shù)據(jù)打印結(jié)果processedStream.print("主數(shù)據(jù)流");env.execute("Side Output");}
}
- 結(jié)果數(shù)據(jù)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-641, age=44, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-17, age=62, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-429, age=25, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-218, age=33, gender=M, hobbit=旅游愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-826, age=39, gender=M, hobbit=籃球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-190, age=31, gender=M, hobbit=旅游愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-266, age=32, gender=W, hobbit=網(wǎng)吧戰(zhàn)神)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-106, age=70, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-911, age=50, gender=M, hobbit=網(wǎng)吧戰(zhàn)神)
M-羽毛球:2> CustomizeBean(name=AAA-925, age=65, gender=M, hobbit=羽毛球運動愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-20, age=59, gender=M, hobbit=書法愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-409, age=79, gender=W, hobbit=天文知識愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-865, age=58, gender=W, hobbit=天文知識愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-898, age=33, gender=M, hobbit=天文知識愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-85, age=38, gender=W, hobbit=非遺文化愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-883, age=51, gender=M, hobbit=美食愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-243, age=37, gender=M, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-430, age=28, gender=W, hobbit=旅游愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-127, age=65, gender=W, hobbit=網(wǎng)吧戰(zhàn)神)
W-籃球/釣魚:3> CustomizeBean(name=AAA-986, age=52, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-840, age=50, gender=W, hobbit=旅游愛好者)
M-羽毛球:2> CustomizeBean(name=AAA-196, age=34, gender=M, hobbit=羽毛球運動愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-142, age=46, gender=W, hobbit=乒乓球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-985, age=78, gender=W, hobbit=美食愛好者)
W-籃球/釣魚:2> CustomizeBean(name=AAA-490, age=50, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:3> CustomizeBean(name=AAA-295, age=77, gender=M, hobbit=籃球運動愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-754, age=50, gender=M, hobbit=天文知識愛好者)
主數(shù)據(jù)流:2> CustomizeBean(name=AAA-249, age=35, gender=W, hobbit=羽毛球運動愛好者)
W-籃球/釣魚:3> CustomizeBean(name=AAA-908, age=27, gender=W, hobbit=釣魚愛好者)
主數(shù)據(jù)流:1> CustomizeBean(name=AAA-674, age=73, gender=M, hobbit=非遺文化愛好者)
通過結(jié)果內(nèi)容可以看到數(shù)據(jù)完全按照我們分流的邏輯進行輸出的,如果想在主數(shù)據(jù)流中講所有數(shù)據(jù)保留下來,Collector<Object> out
單獨拎出來即可,也就是不加到判斷邏輯中,代碼如下,這里就只展示部分代碼了
SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性別String hobbit = value.getHobbit(); // 愛好// 將所有數(shù)據(jù)保留在主流中out.collect(value);// 開始進行分流處理if (gender.equals("M") && hobbit.equals("羽毛球運動愛好者")) {// 將性別為M且愛好為'羽毛球運動愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if ((gender.equals("W") && (hobbit.equals("籃球運動愛好者")) || (gender.equals("W") && hobbit.equals("釣魚愛好者")))) {// 將性別為W且愛好為'籃球運動愛好者'或'釣魚愛好者'進行分流, 注意這里要聲明類型,Java無法自行推斷ctx.output(new OutputTag<CustomizeBean>("W-籃球/釣魚", TypeInformation.of(CustomizeBean.class)), value);}}});
所有的內(nèi)容到這里就結(jié)束了.