制作一個網(wǎng)站需要多少錢百度托管公司
增量聚合的 ProcessWindowFunction?#
ProcessWindowFunction
?可以與?ReduceFunction
?或?AggregateFunction
?搭配使用, 使其能夠在數(shù)據(jù)到達窗口的時候進行增量聚合。當窗口關(guān)閉時,ProcessWindowFunction
?將會得到聚合的結(jié)果。 這樣它就可以增量聚合窗口的元素并且從 ProcessWindowFunction` 中獲得窗口的元數(shù)據(jù)。
你也可以對過時的?WindowFunction
?使用增量聚合。
使用 ReduceFunction 增量聚合?#
- 下例展示了如何將?
ReduceFunction
?與?ProcessWindowFunction
?組合,返回窗口中的最小元素和窗口的開始時間。
?
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
通俗解釋:窗口開始時間的作用
我們可以用一個更貼近生活的例子來理解?窗口開始時間?的意義。
場景比喻:每天上午的「溫度統(tǒng)計報告
假設(shè)你有一個氣象站,每5分鐘記錄一次戶外溫度。現(xiàn)在需要?每小時(例如8:00-9:00)統(tǒng)計一次該時段內(nèi)的最低溫度,并在報告中標注這個小時段的起始時間(如“8:00-9:00的最低溫度是15°C”)。
關(guān)鍵點
-
窗口開始時間:就是時間段的起點(如8:00)。
-
窗口結(jié)束時間:就是時間段的終點(如9:00)。
-
為什么要記錄開始時間?
方便人類理解數(shù)據(jù)屬于哪個時段(比如“8點檔”的數(shù)據(jù))。
代碼示例解析
1. 窗口如何劃分?
假設(shè)使用?滾動窗口(Tumbling Window),每1小時劃分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有時間戳在8:00≤t<9:00的數(shù)據(jù)會被分配到窗口1。
2. 窗口觸發(fā)計算的時機
當系統(tǒng)時間(或事件時間)到達9:00時,窗口1關(guān)閉,觸發(fā)計算:
-
調(diào)用?
MyReduceFunction
?找出該窗口內(nèi)的最低溫度。 -
調(diào)用?
MyProcessWindowFunction
?將結(jié)果與窗口開始時間(8:00)綁定。
3. 為什么輸出的是開始時間(8:00)而不是結(jié)束時間(9:00)?
-
業(yè)務(wù)需求:通常更關(guān)心數(shù)據(jù)所屬時段的起點(例如“8點檔的數(shù)據(jù)”)。
-
避免歧義:如果輸出9:00,可能被誤解為“9點檔的數(shù)據(jù)”(實際是8:00-9:00的數(shù)據(jù))。
代碼中具體如何獲取開始時間?
在?MyProcessWindowFunction
?中:
context.window().getStart(); // 返回窗口的起始時間戳(如8:00對應(yīng)的毫秒值)
-
context
?對象:包含窗口的元信息(起止時間、觸發(fā)時間等)。 -
實際輸出時:將時間戳轉(zhuǎn)換為人類可讀格式(如?
8:00
)。
常見疑問解答
Q1:如果數(shù)據(jù)延遲到達(比如8:59的數(shù)據(jù)在9:05才到),會進入哪個窗口?
-
取決于時間語義:
-
若使用?事件時間(Event Time):按數(shù)據(jù)自帶的時間戳分配到8:00-9:00窗口。
-
若使用?處理時間(Processing?Time):按到達系統(tǒng)的時間分配到9:00-10:00窗口。
-
(示例代碼未顯式設(shè)置時間語義,默認可能是處理時間)
-
Q2:窗口開始時間是如何計算的?
-
由窗口分配器(Window Assigner)決定:
-
滾動窗口按固定間隔對齊(如整點)。
-
滑動窗口按步長對齊(如每30分鐘滑動一次的1小時窗口)。
-
會話窗口根據(jù)數(shù)據(jù)活躍度動態(tài)劃分。
-
Q3:可以同時輸出開始時間和結(jié)束時間嗎?
可以!修改?ProcessWindowFunction
:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
總結(jié)
-
窗口開始時間?標記了數(shù)據(jù)所屬時間段的起點(如“8:00檔”)。
-
在 Flink 中,通過?
ProcessWindowFunction
?的?context
?可以輕松獲取這一信息。 -
這種設(shè)計讓數(shù)據(jù)處理結(jié)果更易理解(如統(tǒng)計報告、監(jiān)控儀表盤)。