国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

簡潔網站欣賞路由優(yōu)化大師

簡潔網站欣賞,路由優(yōu)化大師,邯鄲建設網站制作,wordpress多語言設置熱詞統(tǒng)計案例: 用flink中的窗口函數(apply)讀取kafka中數據,并對熱詞進行統(tǒng)計。 apply:全量聚合函數,指在窗口觸發(fā)的時候才會對窗口內的所有數據進行一次計算(等窗口的數據到齊,才開始進行聚合…

熱詞統(tǒng)計案例:

用flink中的窗口函數(apply)讀取kafka中數據,并對熱詞進行統(tǒng)計。

apply:全量聚合函數,指在窗口觸發(fā)的時候才會對窗口內的所有數據進行一次計算(等窗口的數據到齊,才開始進行聚合計算,可實現對窗口內的數據進行排序等需求)。

代碼演示:

kafka發(fā)送消息端:?

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka發(fā)消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一種Properties properties = new Properties();// 設置連接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 創(chuàng)建了一個消息生產者對象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"聯通換貓","遙遙領先","恒大歌舞團","恒大足球隊","鄭州爛尾樓"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 調用這個里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端:?

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數據Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數據處理轉換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個泛型是輸入數據的類型,第二個泛型是返回值類型   第三個是key 的類型, 第四個是窗口對象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分組key    {"俄烏戰(zhàn)爭",[1,1,1,1,1]}TimeWindow window, // 窗口對象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數據Collector<String> out  // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}

當執(zhí)行kafka接收消息端時,會報如下錯誤:?

?錯誤原因:在對kafka中數據進行KeyBy分組處理時,使用了lambda表達式

?

解決方法:

在使用KeyBy時,將函數的各種參數類型都寫清楚,修改后的代碼如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-準備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加載數據Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-數據處理轉換DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一個泛型是輸入數據的類型,第二個泛型是返回值類型   第三個是key 的類型, 第四個是窗口對象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分組key    {"俄烏戰(zhàn)爭",[1,1,1,1,1]}TimeWindow window, // 窗口對象Iterable<Tuple2<String, Integer>> input, // 分組key在窗口的所有數據Collector<String> out  // 用于輸出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具類String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-執(zhí)行env.execute();}
}

http://aloenet.com.cn/news/47412.html

相關文章:

  • 使用php做的網站有哪些西安自助建站
  • 裝飾公司手機網站湖南網站seo地址
  • 做我韓國連續(xù)劇網站線上營銷技巧和營銷方法
  • asp.net 網站壓縮線下推廣宣傳方式有哪些
  • 網絡培訓的網站建設搜索引擎優(yōu)化要考慮哪些方面
  • 網站標題關鍵字營銷方案策劃書
  • 灌云縣建設局網站營銷公司
  • 海外推廣有前途嗎無錫seo網絡推廣
  • wordpress分享js代碼無錫網站seo顧問
  • 多城市網站如何做seo建立網站的步驟
  • 京東內部券網站怎么做百度公司全稱
  • 好的做網站東莞百度推廣排名
  • 培訓的網站建設鳴蟬智能建站
  • 怎樣看出一個網站是那個公司做的合肥百度推廣公司哪家好
  • 鄭州網站設計專家seo掛機賺錢
  • 北京國貿網站建設網絡優(yōu)化器下載
  • 網站制作公司承擔seo外包公司多嗎
  • 網站加關鍵詞代碼培訓網站建設
  • 仿網站制作教學視頻網絡營銷與直播電商怎么樣
  • 四川成都營銷型網站數據分析網站
  • 網站建設 后端前端廣告聯盟平臺哪個好
  • jsp是前端還是后端開發(fā)的煙臺seo網絡推廣
  • 在華圖做網站編輯友情鏈接交換條件
  • 百度收錄網站定位地址公司網絡推廣營銷
  • 網站開發(fā)需要什么資質百度官方網頁版
  • 怎么健手機網站最新新聞熱點事件及評論
  • 彈性云主機做網站營銷策略范文
  • 論壇網站建設網站推廣的方法有哪幾種
  • 現在購物平臺哪個最好seo黑帽培訓
  • 網站建設策劃案怎么寫推廣普通話手抄報文字內容