聊城網(wǎng)站營銷簡述提升關(guān)鍵詞排名的方法
使用 Flink 消費(fèi) Kafka 中 ChangeRecord 主題的數(shù)據(jù),每隔 1 分鐘輸出最近 3 分鐘的預(yù)警次數(shù)最多的 設(shè)備,將結(jié)果存入Redis 中, key 值為 “warning_last3min_everymin_out” , value 值為 “ 窗口結(jié)束時間,設(shè)備id” (窗口結(jié)束時間格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli 以 HGETALL key方式獲取 warning_last3min_everymin_out值。注:時間語義使用 Processing Time 。
-
Kafka Source
- 從 Kafka 中讀取實(shí)時的設(shè)備預(yù)警數(shù)據(jù),數(shù)據(jù)內(nèi)容應(yīng)當(dāng)包括設(shè)備 ID 和預(yù)警狀態(tài)等信息。
- 數(shù)據(jù)通過?
SimpleStringSchema
?反序列化為字符串格式,再由?parseMessage
?進(jìn)行解析和提取。
-
流處理與窗口
- Flink 使用滑動時間窗口 (
SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))
) 來計(jì)算每 1 分鐘內(nèi)過去 3 分鐘內(nèi)的設(shè)備預(yù)警數(shù)據(jù)。 - 這意味著每 1 分鐘計(jì)算一次,在每次計(jì)算中,會考慮過去 3 分鐘內(nèi)的數(shù)據(jù),因此具有滑動窗口的特點(diǎn)。
- Flink 使用滑動時間窗口 (
-
窗口函數(shù)
- 在?
MaxNumWarnMachineID
?中,窗口內(nèi)的數(shù)據(jù)按設(shè)備 ID 分組,統(tǒng)計(jì)每個設(shè)備的預(yù)警次數(shù),并選出預(yù)警次數(shù)最多的設(shè)備 ID。 apply
?方法處理窗口內(nèi)的數(shù)據(jù)后,輸出一個包含時間戳(窗口結(jié)束時間)和設(shè)備 ID 的元組。
- 在?
-
Redis Sink
- 計(jì)算后的每個時間窗口的最大預(yù)警設(shè)備 ID 將通過 Redis Sink 寫入 Redis,數(shù)據(jù)結(jié)構(gòu)為?
HSET
。 - Redis 中的鍵為?
warning_last3min_everymin_out
,值為設(shè)備 ID。
- 計(jì)算后的每個時間窗口的最大預(yù)警設(shè)備 ID 將通過 Redis Sink 寫入 Redis,數(shù)據(jù)結(jié)構(gòu)為?
?
package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定義常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序邏輯
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 創(chuàng)建流執(zhí)行環(huán)境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 設(shè)置作業(yè)并行度// 構(gòu)建Kafka數(shù)據(jù)源val kafkaSource = buildKafkaSource()// 從Kafka讀取數(shù)據(jù)并處理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息為 (標(biāo)識符, 設(shè)備ID, 狀態(tài)).filter(_._3 == "預(yù)警") // 過濾非預(yù)警狀態(tài)的數(shù)據(jù).keyBy(_._1) // 按標(biāo)識符分組.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑動窗口.apply(new MaxNumWarnMachineID) // 應(yīng)用窗口函數(shù)計(jì)算每分鐘內(nèi)過去3分鐘的最多預(yù)警設(shè)備// 輸出到控制臺和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 執(zhí)行Flink作業(yè)env.execute("WarningLast3MinEveryMinOut Job")}// 構(gòu)建Kafka數(shù)據(jù)源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析來自Kafka的消息為元組private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 構(gòu)建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 預(yù)警設(shè)備計(jì)數(shù)窗口函數(shù)
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 統(tǒng)計(jì)每個設(shè)備ID的預(yù)警次數(shù)val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 獲取窗口結(jié)束時間val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 獲取預(yù)警次數(shù)最多的設(shè)備IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}
?