国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

dw企業(yè)網站設計品牌營銷包括哪些內容

dw企業(yè)網站設計,品牌營銷包括哪些內容,wordpress 添加備案,硅云wordpress多站點目錄 0. 相關文章鏈接 1. 基本操作 1.1. 弱類型 api 1.2. 強類型 1.3. 直接執(zhí)行 sql 2. 基于 event-time 的窗口操作 2.1. event-time 窗口理解 2.2. event-time 窗口生成規(guī)則 3. 基于 Watermark 處理延遲數據 3.1. 什么是 Watermark 機制 3.2. update 模式下使用 w…

目錄

0. 相關文章鏈接

1. 基本操作

1.1.?弱類型 api

1.2. 強類型

1.3.?直接執(zhí)行 sql

2.?基于 event-time 的窗口操作

2.1.?event-time 窗口理解

2.2.?event-time 窗口生成規(guī)則

3.?基于 Watermark 處理延遲數據

3.1. 什么是 Watermark 機制

3.2.?update 模式下使用 watermark

3.3.?append 模式下使用 wartermark

3.4. watermark 機制總結

4.?流數據去重

5. join操作

5.1.?Stream-static Joins

5.1.1.?內連接

5.1.2.?外連接

5.2.?Stream-stream Joins

5.2.1.?inner join

4.2.2.?outer join

6.?Streaming DF/DS 不支持的操作


0. 相關文章鏈接

?Spark文章匯總?

1. 基本操作

在 DF/DS 上大多數通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。

準備處理數據:?people.json

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

1.1.?弱類型 api

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 弱類型 apival df: DataFrame = peopleDF.select("name", "age", "sex").where("age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

結果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

1.2. 強類型

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 強類型,轉成 dsval peopleDS: Dataset[People] = peopleDF.as[People]val df: Dataset[String] = peopleDS.filter((_: People).age > 20).map((_: People).name)df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}case class People(name: String, age: Long, sex: String)

結果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
|  value|
+-------+
|Michael|
|   Andy|
|zhiling|
+-------+

1.3.?直接執(zhí)行 sql

代碼示例:

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 直接執(zhí)行SQL,創(chuàng)建臨時表peopleDF.createOrReplaceTempView("people")val df: DataFrame = spark.sql("select * from people where age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

結果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|Michael| 29|female|
|   Andy| 30|  male|
|zhiling| 40|female|
+-------+---+------+

2.?基于 event-time 的窗口操作

2.1.?event-time 窗口理解

????????在 Structured Streaming 中, 可以按照事件發(fā)生時的時間對數據進行聚合操作, 即基于 event-time 進行操作。在這種機制下, 即不必考慮 Spark 陸續(xù)接收事件的順序是否與事件發(fā)生的順序一致, 也不必考慮事件到達 Spark 的時間與事件發(fā)生時間的關系。因此, 它在提高數據處理精度的同時, 大大減少了開發(fā)者的工作量。我們現在想計算 10 分鐘內的單詞, 每 5 分鐘更新一次, 也就是說在 10 分鐘窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之間收到的單詞量。 注意, 12:00 - 12:10 表示數據在 12:00 之后 12:10 之前到達?,F在,考慮一下在 12:07 收到的單詞。單詞應該增加對應于兩個窗口12:00 - 12:10和12:05 - 12:15的計數。因此,計數將由分組鍵(即單詞)和窗口(可以從事件時間計算)索引。

統(tǒng)計后的結果應該是這樣的:

代碼示例:

import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", value = true) // 給產生的數據自動添加時間戳.load// 把行切割成單詞, 保留時間戳val words: DataFrame = lines.as[(String, Timestamp)].flatMap((line: (String, Timestamp)) => {line._1.split(" ").map(((_: String), line._2))}).toDF("word", "timestamp")// 按照窗口和單詞分組, 并且計算每組的單詞的個數,最后按照窗口排序val wordCounts: Dataset[Row] = words.groupBy(// 調用 window 函數, 返回的是一個 Column 類型// 參數 1: df 中表示時間戳的列// 參數 2: 窗口長度// 參數 3: 滑動步長window($"timestamp", "60 seconds", "10 seconds"),$"word").count().orderBy($"window")wordCounts.writeStream.outputMode("complete").format("console").option("truncate", "false") // 不截斷.為了在控制臺能看到完整信息, 最好設置為 false.start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

結果輸出:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
+------------------------------------------+----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a   |3    |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5    |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a   |3    |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5    |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a   |3    |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a   |3    |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a   |2    |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a   |2    |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a   |2    |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1    |
|[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a   |2    |
+------------------------------------------+----+-----+
only showing top 20 rows

由此可以看出, 在這種窗口機制下, 無論事件何時到達, 以怎樣的順序到達, Structured Streaming 總會根據事件時間生成對應的若干個時間窗口, 然后按照指定的規(guī)則聚合。

2.2.?event-time 窗口生成規(guī)則

可以查看?org.apache.spark.sql.catalyst.analysis.TimeWindowing 類下的如下代碼:

The windows are calculated as below:
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)windowId <- ceil((timestamp - startTime) / slideDuration)windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

????????將event-time 作為“初始窗口”的結束時間, 然后按照窗口滑動寬度逐漸向時間軸前方推進, 直到某個窗口不再包含該 event-time 為止。 最終以“初始窗口”與“結束窗口”之間的若干個窗口作為最終生成的 event-time 的時間窗口。

每個窗口的起始時間與結束時間都是前必后開的區(qū)間, 因此初始窗口和結束窗口都不會包含 event-time, 最終不會被使用。

得到窗口如下:

3.?基于 Watermark 處理延遲數據

3.1. 什么是 Watermark 機制

????????在數據分析系統(tǒng)中, Structured Streaming 可以持續(xù)的按照 event-time 聚合數據, 然而在此過程中并不能保證數據按照時間的先后依次到達。 例如: 當前接收的某一條數據的 event-time 可能遠遠早于之前已經處理過的 event-time。 在發(fā)生這種情況時, 往往需要結合業(yè)務需求對延遲數據進行過濾?,F在考慮如果事件延遲到達會有哪些影響。 假如, 一個單詞在 12:04(event-time) 產生, 在 12:11 到達應用。 應用應該使用 12:04 來在窗口(12:00 - 12:10)中更新計數, 而不是使用 12:11。 這些情況在我們基于窗口的聚合中是自然發(fā)生的, 因為結構化流可以長時間維持部分聚合的中間狀態(tài)。

????????但是, 如果這個查詢運行數天, 系統(tǒng)很有必要限制內存中累積的中間狀態(tài)的數量。 這意味著系統(tǒng)需要知道何時從內存狀態(tài)中刪除舊聚合, 因為應用不再接受該聚合的后期數據。為了實現這個需求, 從 spark2.1, 引入了 watermark(水印), 使用引擎可以自動的跟蹤當前的事件時間, 并據此嘗試刪除舊狀態(tài)。通過指定 event-time 列和預估事件的延遲時間上限來定義一個查詢的 watermark。 針對一個以時間 T 結束的窗口, 引擎會保留狀態(tài)和允許延遲時間直到(max event time seen by the engine - late threshold > T)。 換句話說, 延遲時間在上限內的被聚合, 延遲時間超出上限的開始被丟棄。

????????可以通過withWatermark() 來定義watermark,watermark 計算方式:watermark = MaxEventTime - Threshhod;而且, watermark只能逐漸增加, 不能減少。

Structured Streaming 引入 Watermark 機制, 主要是為了解決以下兩個問題:

  • 處理聚合中的延遲數據
  • 減少內存中維護的聚合狀態(tài).

注意:在不同輸出模式(complete, append, update)中, Watermark 會產生不同的影響。

3.2.?update 模式下使用 watermark

在 update 模式下, 僅輸出與之前批次的結果相比, 涉及更新或新增的數據。

代碼示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數據中包含時間戳, 而不是自動添加的時間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數 1: event-time 所在列的列名 參數 2: 延遲時間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數據輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("update").trigger(Trigger.ProcessingTime(1000)).format("console").option("truncate", "false").startquery.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

初始化的wartmark是 0,通過如下輸入的幾條數據,可以看到水位線的變化。

第一次輸入數據:??2023-08-07 10:55:00,dog 。這個條數據作為第一批數據。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個窗口。 由于是第一批, 所有的窗口的結束時間都大于 wartermark(0), 所以 5 個窗口都顯示,如下所示:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

然后根據當前批次中最大的 event-time, 計算出來下次使用的 watermark. 本批次只有一個數據(10:55), 所有: watermark = 10:55 - 2min = 10:53 。

第二次輸入數據:??2023-08-07 11:00:00,dog?。 這條數據作為第二批數據, 計算得到 5 個窗口。 此時的watermark=10:53, 所有的窗口的結束時間均大于 watermark。 在 update 模式下, 只輸出結果表中涉及更新或新增的數據。

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2    |
|[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2    |
|[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1    |
+------------------------------------------+----+-----+

其中: count 是 2 的表示更新, count 是 1 的表示新增。 沒有變化的就沒有顯示(但是內存中仍然保存著)。此時的的 watermark = 11:00 - 2min = 10:58 。如下數據為在內存中保存著,但是沒有打印出來的數據:

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1    |

第三次輸入數據:? ?2023-08-07 10:55:00,dog? 。?這條數據作為第 3 批次,相當于一條延遲數據,計算得到 5 個窗口。此時的 watermark = 10:58 當前內存中有兩個窗口的結束時間已經低于 10: 58。

|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |

則立即刪除這兩個窗口在內存中的維護狀態(tài)。 同時, 當前批次中新加入的數據所劃分出來的窗口, 如果窗口結束時間低于 11:58, 則窗口會被過濾掉。

所以這次輸出結果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2    |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3    |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3    |
+------------------------------------------+----+-----+

第三個批次的數據處理完成后, 立即計算: watermark= 10:55 - 2min = 10:53, 這個值小于當前的 watermask(10:58), 所以保持不變(因為 watermask 只能增加不能減少)。

3.3.?append 模式下使用 wartermark

代碼示例如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數據中包含時間戳, 而不是自動添加的時間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數 1: event-time 所在列的列名 參數 2: 延遲時間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數據輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").option("truncate", "false").startquery.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

在 append 模式中, 僅輸出新增的數據, 且輸出后的數據無法變更。

第一次輸入數據:?2023-08-07 10:55:00,dog? 。?這個條數據作為第一批數據。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個窗口。 由于此時初始 watermask=0, 當前批次中所有窗口的結束時間均大于 watermask。但是 Structured Streaming 無法確定后續(xù)批次的數據中是否會更新當前批次的內容。 因此, 基于 Append 模式的特點, 這時并不會輸出任何數據(因為輸出后數據就無法更改了), 直到某個窗口的結束時間小于 watermask, 即可以確定后續(xù)數據不會再變更該窗口的聚合結果時才會將其輸出, 并移除內存中對應窗口的聚合狀態(tài)。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后根據當前批次中最大的 event-time, 計算出來下次使用的 watermark。 本批次只有一個數據(10:55), 所有: watermark = 10:55 - 2min = 10:53

第二次輸入數據:?2023-08-07 11:00:00,dog? 。這條數據作為第二批數據, 計算得到 5 個窗口。 此時的watermark=10:53, 所有的窗口的結束時間均大于 watermark, 仍然不會輸出。

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

然后計算 watermark = 11:00 - 2min = 10:58

第三次輸入數據:?2023-08-07 10:55:00,dog 。相當于一條延遲數據,這條數據作為第 3 批次, 計算得到 5 個窗口。 此時的 watermark = 10:58 當前內存中有兩個窗口的結束時間已經低于 10: 58。

|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |

則意味著這兩個窗口的數據不會再發(fā)生變化, 此時輸出這個兩個窗口的聚合結果, 并在內存中清除這兩個窗口的狀態(tài)。所以這次輸出結果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1    |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1    |
+------------------------------------------+----+-----+

第三個批次的數據處理完成后, 立即計算: watermark= 10:55 - 2min = 10:53, 這個值小于當前的 watermask(10:58), 所以保持不變。(因為 watermask 只能增加不能減少)

3.4. watermark 機制總結

  • watermark 在用于基于時間的狀態(tài)聚合操作時, 該時間可以基于窗口, 也可以基于 event-timeb本身。
  • 輸出模式必須是append或update。 在輸出模式是complete的時候(必須有聚合), 要求每次輸出所有的聚合結果。 我們使用 watermark 的目的是丟棄一些過時聚合數據, 所以complete模式使用wartermark無效也無意義。
  • 在輸出模式是append時, 必須設置 watermask 才能使用聚合操作。 其實, watermask 定義了 append 模式中何時輸出聚合聚合結果(狀態(tài)), 并清理過期狀態(tài)。
  • 在輸出模式是update時, watermask 主要用于過濾過期數據并及時清理過期狀態(tài)。
  • watermask 會在處理當前批次數據時更新, 并且會在處理下一個批次數據時生效使用。 但如果節(jié)點發(fā)送故障, 則可能延遲若干批次生效。
  • withWatermark 必須使用與聚合操作中的時間戳列是同一列。df.withWatermark("time", "1 min").groupBy("time2").count() 無效。
  • withWatermark 必須在聚合之前調用。 f.groupBy("time").count().withWatermark("time", "1 min") 無效。

4.?流數據去重

需求內容:根據唯一的 id 實現數據去重

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 數據預處理val words: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), Timestamp.valueOf(arr(1)), arr(2))}).toDF("uid", "ts", "word")// 去重重復數據 uid 相同就是重復.  可以傳遞多個列val wordCounts: Dataset[Row] = words.withWatermark("ts", "2 minutes").dropDuplicates("uid")// 輸出數據wordCounts.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

數據輸入(按順序從上到下):

1,2023-08-09 11:50:00,dog
2,2023-08-09 11:51:00,dog
1,2023-08-09 11:50:00,dog
3,2023-08-09 11:53:00,dog
1,2023-08-09 11:50:00,dog
4,2023-08-09 11:45:00,dog

注意點:

  • dropDuplicates 不可用在聚合之后, 即通過聚合得到的 df/ds 不能調用dropDuplicates?
  • 使用watermask - 如果重復記錄的到達時間有上限,則可以在事件時間列上定義水印,并使用guid和事件時間列進行重復數據刪除。該查詢將使用水印從過去的記錄中刪除舊的狀態(tài)數據,這些記錄不會再被重復。這限制了查詢必須維護的狀態(tài)量。?
  • 沒有watermask - 由于重復記錄可能到達時沒有界限,查詢將來自所有過去記錄的數據存儲為狀態(tài)。

測試:

  • 第一次輸入數據:1,2023-08-09 11:50:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  1|2023-08-09 11:50:00| dog|
+---+-------------------+----+
  • 第二次輸入數據:2,2023-08-09 11:51:00,dog
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  2|2023-08-09 11:51:00| dog|
+---+-------------------+----+
  • 第三次輸入數據:1,2023-08-09 11:50:00,dog (id 重復無輸出)
  • 第四次輸入數據:3,2023-08-09 11:53:00,dog (此時 watermask=11:51)
+---+-------------------+----+
|uid|                 ts|word|
+---+-------------------+----+
|  3|2023-08-09 11:53:00| dog|
+---+-------------------+----+
  • 第五次輸入數據:1,2023-08-09 11:50:00,dog (數據重復, 并且數據過期, 所以無輸出)
  • 第六次輸入數據:4,2023-08-09 11:45:00,dog (數據過時, 所以無輸出)

5. join操作

????????Structured Streaming 支持 streaming DataSet/DataFrame 與靜態(tài)的DataSet/DataFrame 進行 join, 也支持 streaming DataSet/DataFrame與另外一個streaming DataSet/DataFrame 進行 join。join 的結果也是持續(xù)不斷的生成, 類似于前面的 streaming 的聚合結果。

5.1.?Stream-static Joins

靜態(tài)數據:

lisi,male
zhiling,female
zs,male

流式數據:

lisi,20
zhiling,40
ww,30

5.1.1.?內連接

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. join   等值內連接  a.name=b.nameval joinResult: DataFrame = streamDF.join(staticDF, "name")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

數據輸出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|   lisi| 20|  male|
+-------+---+------+

5.1.2.?外連接

代碼示例:

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. joinval joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}

數據輸出:

+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|     ww| 30|  null|
|   lisi| 20|  male|
+-------+---+------+

5.2.?Stream-stream Joins

????????在 Spark2.3, 開始支持 stream-stream join。Spark 會自動維護兩個流的狀態(tài), 以保障后續(xù)流入的數據能夠和之前流入的數據發(fā)生 join 操作, 但這會導致狀態(tài)無限增長。 因此, 在對兩個流進行 join 操作時, 依然可以用 watermark 機制來消除過期的狀態(tài), 避免狀態(tài)無限增長。

第 1 個數據格式:姓名,年齡,事件時間

lisi,female,2023-08-09 11:50:00
zs,male,2023-08-09 11:51:00
ww,female,2023-08-09 11:52:00
zhiling,female,2023-08-09 11:53:00
fengjie,female,2023-08-09 11:54:00
yifei,female,2023-08-09 11:55:00

第 2?個數據格式:姓名,年齡,事件時間

lisi,18,2023-08-09 11:50:00
zs,19,2023-08-09 11:51:00
ww,20,2023-08-09 11:52:00
zhiling,22,2023-08-09 11:53:00
yifei,30,2023-08-09 11:54:00
fengjie,98,2023-08-09 11:55:00

5.2.1.?inner join

對 2 個流式數據進行 join 操作,輸出模式僅支持append模式。

不帶 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name", "age", "ts2")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}//      數據輸出:
//      +-------+------+-------------------+---+-------------------+
//      |   name|   sex|                ts1|age|                ts2|
//      +-------+------+-------------------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00|
//      |  yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00|
//      |     zs|  male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+---+-------------------+

帶 watermast 的 inner join(join 的速度很慢):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin))// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}//      數據輸出:
//      +-------+------+-------------------+-------+---+-------------------+
//      |  name1|   sex|                ts1|  name2|age|                ts2|
//      +-------+------+-------------------+-------+---+-------------------+
//      |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//      |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//      |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//      |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//      |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//      +-------+------+-------------------+-------+---+-------------------+

4.2.2.?outer join

外連接必須使用 watermast,和內連接相比, 代碼幾乎一致, 只需要在連接的時候指定下連接類型即可:joinType = "left"。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin),joinType = "left")// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}//      數據輸出:
//        +-------+------+-------------------+-------+---+-------------------+
//        |  name1|   sex|                ts1|  name2|age|                ts2|
//        +-------+------+-------------------+-------+---+-------------------+
//        |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
//        |     ww|female|2023-08-09 11:52:00|     ww| 20|2023-08-09 11:52:00|
//        |     zs|  male|2023-08-09 11:51:00|     zs| 19|2023-08-09 11:51:00|
//        |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
//        |   lisi|female|2023-08-09 11:50:00|   lisi| 18|2023-08-09 11:50:00|
//        +-------+------+-------------------+-------+---+-------------------+

6.?Streaming DF/DS 不支持的操作

到目前, DF/DS 的有些操作 Streaming DF/DS 還不支持:

  • 多個Streaming 聚合(例如在 DF 上的聚合鏈)目前還不支持
  • limit 和取前 N 行還不支持
  • distinct 也不支持
  • 僅僅支持對 complete 模式下的聚合操作進行排序操作
  • 僅支持有限的外連接
  • 有些方法不能直接用于查詢和返回結果, 因為他們用在流式數據上沒有意義
    • count() 不能返回單行數據, 必須是s.groupBy().count()
    • foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
    • show() 不能直接使用, 而是使用 console sink

如果執(zhí)行上面操作會看到這樣的異常: operation XYZ is not supported with streaming DataFrames/Datasets。


注:其他Spark相關系列文章鏈接由此進 ->??Spark文章匯總?


http://aloenet.com.cn/news/46351.html

相關文章:

  • 網站加載效果怎么做的百度推廣代運營
  • wordpress mysql重啟資源網站優(yōu)化排名軟件公司
  • 成華網站制作為什么中國禁止谷歌瀏覽器
  • 標題優(yōu)化方法郴州seo快速排名
  • 今日濮陽重大新聞seo優(yōu)化服務是什么意思
  • asp.net做的網站要放到網上空間去_要放哪些文件上去網站建網站建設網站
  • 房山新農村建設網站深圳百度seo公司
  • 免費代理ip的網站百度搜索推廣操作簡要流程
  • 如何進行網站運營與規(guī)劃打開百度網頁
  • 國外做批發(fā)配件的 在哪個網站百度葷seo公司
  • 傳奇網站怎么制作教程查關鍵詞
  • windows做網站服務器杭州推廣公司
  • 什么是網站功能需求推推蛙品牌策劃
  • 有服務器可以做網站嗎站長工具是什么意思
  • 商業(yè)網站建立搜索引擎優(yōu)化seo應用
  • 中國三農建設工作委員會官方網站深圳網絡推廣最新招聘
  • 云服務器可以做網站嗎網絡營銷的一般流程
  • 知名企業(yè)網站建設哈爾濱網站制作軟件
  • 做鋼材的網站有哪些網站的網站建設
  • 個人網站畢業(yè)設計搜索關鍵詞然后排名怎樣提升
  • 國外兒童社區(qū)網站模板外鏈信息
  • 做微網站迅宇科技網店推廣是什么
  • 做網站的不給做robots文件百度推廣登錄后臺
  • 在百度上做網站多少錢百度收錄提交
  • 杭州旅游 網站建設必應搜索引擎地址
  • 一個網站可以做多少個小程序營銷推廣方案
  • 做301跳轉會影響之前網站排名嗎上海谷歌推廣
  • 在國外網站做中國旅游推廣百度關鍵詞熱度排名
  • 什么專業(yè)可以做網站百度店鋪免費入駐
  • 學校定制網站建設公司深圳優(yōu)化公司高粱seo較