dw企業(yè)網站設計品牌營銷包括哪些內容
目錄
0. 相關文章鏈接
1. 基本操作
1.1.?弱類型 api
1.2. 強類型
1.3.?直接執(zhí)行 sql
2.?基于 event-time 的窗口操作
2.1.?event-time 窗口理解
2.2.?event-time 窗口生成規(guī)則
3.?基于 Watermark 處理延遲數據
3.1. 什么是 Watermark 機制
3.2.?update 模式下使用 watermark
3.3.?append 模式下使用 wartermark
3.4. watermark 機制總結
4.?流數據去重
5. join操作
5.1.?Stream-static Joins
5.1.1.?內連接
5.1.2.?外連接
5.2.?Stream-stream Joins
5.2.1.?inner join
4.2.2.?outer join
6.?Streaming DF/DS 不支持的操作
0. 相關文章鏈接
?Spark文章匯總?
1. 基本操作
在 DF/DS 上大多數通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上。
準備處理數據:?people.json
{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
1.1.?弱類型 api
代碼示例:
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 弱類型 apival df: DataFrame = peopleDF.select("name", "age", "sex").where("age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
結果輸出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
| name|age| sex|
+-------+---+------+
|Michael| 29|female|
| Andy| 30| male|
|zhiling| 40|female|
+-------+---+------+
1.2. 強類型
代碼示例:
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 強類型,轉成 dsval peopleDS: Dataset[People] = peopleDF.as[People]val df: Dataset[String] = peopleDS.filter((_: People).age > 20).map((_: People).name)df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}case class People(name: String, age: Long, sex: String)
結果輸出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+
| value|
+-------+
|Michael|
| Andy|
|zhiling|
+-------+
1.3.?直接執(zhí)行 sql
代碼示例:
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 創(chuàng)建格式,并讀取數據val peopleSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("sex", StringType)val peopleDF: DataFrame = spark.readStream.schema(peopleSchema).json("/Project/Data/json")// 直接執(zhí)行SQL,創(chuàng)建臨時表peopleDF.createOrReplaceTempView("people")val df: DataFrame = spark.sql("select * from people where age > 20")df.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
結果輸出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---+------+
| name|age| sex|
+-------+---+------+
|Michael| 29|female|
| Andy| 30| male|
|zhiling| 40|female|
+-------+---+------+
2.?基于 event-time 的窗口操作
2.1.?event-time 窗口理解
????????在 Structured Streaming 中, 可以按照事件發(fā)生時的時間對數據進行聚合操作, 即基于 event-time 進行操作。在這種機制下, 即不必考慮 Spark 陸續(xù)接收事件的順序是否與事件發(fā)生的順序一致, 也不必考慮事件到達 Spark 的時間與事件發(fā)生時間的關系。因此, 它在提高數據處理精度的同時, 大大減少了開發(fā)者的工作量。我們現在想計算 10 分鐘內的單詞, 每 5 分鐘更新一次, 也就是說在 10 分鐘窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之間收到的單詞量。 注意, 12:00 - 12:10 表示數據在 12:00 之后 12:10 之前到達?,F在,考慮一下在 12:07 收到的單詞。單詞應該增加對應于兩個窗口12:00 - 12:10和12:05 - 12:15的計數。因此,計數將由分組鍵(即單詞)和窗口(可以從事件時間計算)索引。
統(tǒng)計后的結果應該是這樣的:
代碼示例:
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", value = true) // 給產生的數據自動添加時間戳.load// 把行切割成單詞, 保留時間戳val words: DataFrame = lines.as[(String, Timestamp)].flatMap((line: (String, Timestamp)) => {line._1.split(" ").map(((_: String), line._2))}).toDF("word", "timestamp")// 按照窗口和單詞分組, 并且計算每組的單詞的個數,最后按照窗口排序val wordCounts: Dataset[Row] = words.groupBy(// 調用 window 函數, 返回的是一個 Column 類型// 參數 1: df 中表示時間戳的列// 參數 2: 窗口長度// 參數 3: 滑動步長window($"timestamp", "60 seconds", "10 seconds"),$"word").count().orderBy($"window")wordCounts.writeStream.outputMode("complete").format("console").option("truncate", "false") // 不截斷.為了在控制臺能看到完整信息, 最好設置為 false.start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
結果輸出:
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3 |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a |3 |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5 |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5 |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a |3 |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a |3 |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5 |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5 |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a |3 |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a |3 |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5 |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a |3 |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2 |
+------------------------------------------+----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-02 21:18:00, 2023-08-02 21:19:00]|abc |3 |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|a |3 |
|[2023-08-02 21:18:10, 2023-08-02 21:19:10]|abc |5 |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|abc |5 |
|[2023-08-02 21:18:20, 2023-08-02 21:19:20]|a |3 |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|a |3 |
|[2023-08-02 21:18:30, 2023-08-02 21:19:30]|abc |5 |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|abc |5 |
|[2023-08-02 21:18:40, 2023-08-02 21:19:40]|a |3 |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|a |3 |
|[2023-08-02 21:18:50, 2023-08-02 21:19:50]|abc |5 |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|a |3 |
|[2023-08-02 21:19:00, 2023-08-02 21:20:00]|abc |2 |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|a |2 |
|[2023-08-02 21:19:10, 2023-08-02 21:20:10]|abc |1 |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|a |2 |
|[2023-08-02 21:19:20, 2023-08-02 21:20:20]|abc |1 |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|a |2 |
|[2023-08-02 21:19:30, 2023-08-02 21:20:30]|abc |1 |
|[2023-08-02 21:19:40, 2023-08-02 21:20:40]|a |2 |
+------------------------------------------+----+-----+
only showing top 20 rows
由此可以看出, 在這種窗口機制下, 無論事件何時到達, 以怎樣的順序到達, Structured Streaming 總會根據事件時間生成對應的若干個時間窗口, 然后按照指定的規(guī)則聚合。
2.2.?event-time 窗口生成規(guī)則
可以查看?org.apache.spark.sql.catalyst.analysis.TimeWindowing 類下的如下代碼:
The windows are calculated as below:
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)windowId <- ceil((timestamp - startTime) / slideDuration)windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd
????????將event-time 作為“初始窗口”的結束時間, 然后按照窗口滑動寬度逐漸向時間軸前方推進, 直到某個窗口不再包含該 event-time 為止。 最終以“初始窗口”與“結束窗口”之間的若干個窗口作為最終生成的 event-time 的時間窗口。
每個窗口的起始時間與結束時間都是前必后開的區(qū)間, 因此初始窗口和結束窗口都不會包含 event-time, 最終不會被使用。
得到窗口如下:
3.?基于 Watermark 處理延遲數據
3.1. 什么是 Watermark 機制
????????在數據分析系統(tǒng)中, Structured Streaming 可以持續(xù)的按照 event-time 聚合數據, 然而在此過程中并不能保證數據按照時間的先后依次到達。 例如: 當前接收的某一條數據的 event-time 可能遠遠早于之前已經處理過的 event-time。 在發(fā)生這種情況時, 往往需要結合業(yè)務需求對延遲數據進行過濾?,F在考慮如果事件延遲到達會有哪些影響。 假如, 一個單詞在 12:04(event-time) 產生, 在 12:11 到達應用。 應用應該使用 12:04 來在窗口(12:00 - 12:10)中更新計數, 而不是使用 12:11。 這些情況在我們基于窗口的聚合中是自然發(fā)生的, 因為結構化流可以長時間維持部分聚合的中間狀態(tài)。
????????但是, 如果這個查詢運行數天, 系統(tǒng)很有必要限制內存中累積的中間狀態(tài)的數量。 這意味著系統(tǒng)需要知道何時從內存狀態(tài)中刪除舊聚合, 因為應用不再接受該聚合的后期數據。為了實現這個需求, 從 spark2.1, 引入了 watermark(水印), 使用引擎可以自動的跟蹤當前的事件時間, 并據此嘗試刪除舊狀態(tài)。通過指定 event-time 列和預估事件的延遲時間上限來定義一個查詢的 watermark。 針對一個以時間 T 結束的窗口, 引擎會保留狀態(tài)和允許延遲時間直到(max event time seen by the engine - late threshold > T)。 換句話說, 延遲時間在上限內的被聚合, 延遲時間超出上限的開始被丟棄。
????????可以通過withWatermark() 來定義watermark,watermark 計算方式:watermark = MaxEventTime - Threshhod;而且, watermark只能逐漸增加, 不能減少。
Structured Streaming 引入 Watermark 機制, 主要是為了解決以下兩個問題:
- 處理聚合中的延遲數據
- 減少內存中維護的聚合狀態(tài).
注意:在不同輸出模式(complete, append, update)中, Watermark 會產生不同的影響。
3.2.?update 模式下使用 watermark
在 update 模式下, 僅輸出與之前批次的結果相比, 涉及更新或新增的數據。
代碼示例如下:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數據中包含時間戳, 而不是自動添加的時間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數 1: event-time 所在列的列名 參數 2: 延遲時間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數據輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("update").trigger(Trigger.ProcessingTime(1000)).format("console").option("truncate", "false").startquery.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
初始化的wartmark是 0,通過如下輸入的幾條數據,可以看到水位線的變化。
第一次輸入數據:??2023-08-07 10:55:00,dog 。這個條數據作為第一批數據。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個窗口。 由于是第一批, 所有的窗口的結束時間都大于 wartermark(0), 所以 5 個窗口都顯示,如下所示:
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1 |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |1 |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |1 |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 |
+------------------------------------------+----+-----+
然后根據當前批次中最大的 event-time, 計算出來下次使用的 watermark. 本批次只有一個數據(10:55), 所有: watermark = 10:55 - 2min = 10:53 。
第二次輸入數據:??2023-08-07 11:00:00,dog?。 這條數據作為第二批數據, 計算得到 5 個窗口。 此時的watermark=10:53, 所有的窗口的結束時間均大于 watermark。 在 update 模式下, 只輸出結果表中涉及更新或新增的數據。
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:58:00, 2023-08-07 11:08:00]|dog |1 |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |2 |
|[2023-08-07 10:56:00, 2023-08-07 11:06:00]|dog |1 |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |2 |
|[2023-08-07 11:00:00, 2023-08-07 11:10:00]|dog |1 |
+------------------------------------------+----+-----+
其中: count 是 2 的表示更新, count 是 1 的表示新增。 沒有變化的就沒有顯示(但是內存中仍然保存著)。此時的的 watermark = 11:00 - 2min = 10:58 。如下數據為在內存中保存著,但是沒有打印出來的數據:
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 |
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |1 |
第三次輸入數據:? ?2023-08-07 10:55:00,dog? 。?這條數據作為第 3 批次,相當于一條延遲數據,計算得到 5 個窗口。此時的 watermark = 10:58 當前內存中有兩個窗口的結束時間已經低于 10: 58。
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 |
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 |
則立即刪除這兩個窗口在內存中的維護狀態(tài)。 同時, 當前批次中新加入的數據所劃分出來的窗口, 如果窗口結束時間低于 11:58, 則窗口會被過濾掉。
所以這次輸出結果:
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:50:00, 2023-08-07 11:00:00]|dog |2 |
|[2023-08-07 10:54:00, 2023-08-07 11:04:00]|dog |3 |
|[2023-08-07 10:52:00, 2023-08-07 11:02:00]|dog |3 |
+------------------------------------------+----+-----+
第三個批次的數據處理完成后, 立即計算: watermark= 10:55 - 2min = 10:53, 這個值小于當前的 watermask(10:58), 所以保持不變(因為 watermask 只能增加不能減少)。
3.3.?append 模式下使用 wartermark
代碼示例如下:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 輸入的數據中包含時間戳, 而不是自動添加的時間戳val words: DataFrame = lines.as[String].flatMap((line: String) => {val split: Array[String] = line.split(",")split(1).split(" ").map(((_: String), Timestamp.valueOf(split(0))))}).toDF("word", "timestamp")// 使用 withWatermark 方法,添加watermark, 參數 1: event-time 所在列的列名 參數 2: 延遲時間的上限.val wordCounts: Dataset[Row] = words.withWatermark("timestamp", "2 minutes").groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word").count()// 數據輸出val query: StreamingQuery = wordCounts.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").option("truncate", "false").startquery.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
在 append 模式中, 僅輸出新增的數據, 且輸出后的數據無法變更。
第一次輸入數據:?2023-08-07 10:55:00,dog? 。?這個條數據作為第一批數據。 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 個窗口。 由于此時初始 watermask=0, 當前批次中所有窗口的結束時間均大于 watermask。但是 Structured Streaming 無法確定后續(xù)批次的數據中是否會更新當前批次的內容。 因此, 基于 Append 模式的特點, 這時并不會輸出任何數據(因為輸出后數據就無法更改了), 直到某個窗口的結束時間小于 watermask, 即可以確定后續(xù)數據不會再變更該窗口的聚合結果時才會將其輸出, 并移除內存中對應窗口的聚合狀態(tài)。
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+
然后根據當前批次中最大的 event-time, 計算出來下次使用的 watermark。 本批次只有一個數據(10:55), 所有: watermark = 10:55 - 2min = 10:53
第二次輸入數據:?2023-08-07 11:00:00,dog? 。這條數據作為第二批數據, 計算得到 5 個窗口。 此時的watermark=10:53, 所有的窗口的結束時間均大于 watermark, 仍然不會輸出。
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+
然后計算 watermark = 11:00 - 2min = 10:58
第三次輸入數據:?2023-08-07 10:55:00,dog 。相當于一條延遲數據,這條數據作為第 3 批次, 計算得到 5 個窗口。 此時的 watermark = 10:58 當前內存中有兩個窗口的結束時間已經低于 10: 58。
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 |
則意味著這兩個窗口的數據不會再發(fā)生變化, 此時輸出這個兩個窗口的聚合結果, 并在內存中清除這兩個窗口的狀態(tài)。所以這次輸出結果:
+------------------------------------------+----+-----+
|window |word|count|
+------------------------------------------+----+-----+
|[2023-08-07 10:48:00, 2023-08-07 10:58:00]|dog |1 |
|[2023-08-07 10:46:00, 2023-08-07 10:56:00]|dog |1 |
+------------------------------------------+----+-----+
第三個批次的數據處理完成后, 立即計算: watermark= 10:55 - 2min = 10:53, 這個值小于當前的 watermask(10:58), 所以保持不變。(因為 watermask 只能增加不能減少)
3.4. watermark 機制總結
- watermark 在用于基于時間的狀態(tài)聚合操作時, 該時間可以基于窗口, 也可以基于 event-timeb本身。
- 輸出模式必須是append或update。 在輸出模式是complete的時候(必須有聚合), 要求每次輸出所有的聚合結果。 我們使用 watermark 的目的是丟棄一些過時聚合數據, 所以complete模式使用wartermark無效也無意義。
- 在輸出模式是append時, 必須設置 watermask 才能使用聚合操作。 其實, watermask 定義了 append 模式中何時輸出聚合聚合結果(狀態(tài)), 并清理過期狀態(tài)。
- 在輸出模式是update時, watermask 主要用于過濾過期數據并及時清理過期狀態(tài)。
- watermask 會在處理當前批次數據時更新, 并且會在處理下一個批次數據時生效使用。 但如果節(jié)點發(fā)送故障, 則可能延遲若干批次生效。
- withWatermark 必須使用與聚合操作中的時間戳列是同一列。df.withWatermark("time", "1 min").groupBy("time2").count() 無效。
- withWatermark 必須在聚合之前調用。 f.groupBy("time").count().withWatermark("time", "1 min") 無效。
4.?流數據去重
需求內容:根據唯一的 id 實現數據去重
代碼示例:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用socket數據源val lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load// 數據預處理val words: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), Timestamp.valueOf(arr(1)), arr(2))}).toDF("uid", "ts", "word")// 去重重復數據 uid 相同就是重復. 可以傳遞多個列val wordCounts: Dataset[Row] = words.withWatermark("ts", "2 minutes").dropDuplicates("uid")// 輸出數據wordCounts.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
數據輸入(按順序從上到下):
1,2023-08-09 11:50:00,dog
2,2023-08-09 11:51:00,dog
1,2023-08-09 11:50:00,dog
3,2023-08-09 11:53:00,dog
1,2023-08-09 11:50:00,dog
4,2023-08-09 11:45:00,dog
注意點:
- dropDuplicates 不可用在聚合之后, 即通過聚合得到的 df/ds 不能調用dropDuplicates?
- 使用watermask - 如果重復記錄的到達時間有上限,則可以在事件時間列上定義水印,并使用guid和事件時間列進行重復數據刪除。該查詢將使用水印從過去的記錄中刪除舊的狀態(tài)數據,這些記錄不會再被重復。這限制了查詢必須維護的狀態(tài)量。?
- 沒有watermask - 由于重復記錄可能到達時沒有界限,查詢將來自所有過去記錄的數據存儲為狀態(tài)。
測試:
- 第一次輸入數據:1,2023-08-09 11:50:00,dog
+---+-------------------+----+
|uid| ts|word|
+---+-------------------+----+
| 1|2023-08-09 11:50:00| dog|
+---+-------------------+----+
- 第二次輸入數據:2,2023-08-09 11:51:00,dog
+---+-------------------+----+
|uid| ts|word|
+---+-------------------+----+
| 2|2023-08-09 11:51:00| dog|
+---+-------------------+----+
- 第三次輸入數據:1,2023-08-09 11:50:00,dog (id 重復無輸出)
- 第四次輸入數據:3,2023-08-09 11:53:00,dog (此時 watermask=11:51)
+---+-------------------+----+
|uid| ts|word|
+---+-------------------+----+
| 3|2023-08-09 11:53:00| dog|
+---+-------------------+----+
- 第五次輸入數據:1,2023-08-09 11:50:00,dog (數據重復, 并且數據過期, 所以無輸出)
- 第六次輸入數據:4,2023-08-09 11:45:00,dog (數據過時, 所以無輸出)
5. join操作
????????Structured Streaming 支持 streaming DataSet/DataFrame 與靜態(tài)的DataSet/DataFrame 進行 join, 也支持 streaming DataSet/DataFrame與另外一個streaming DataSet/DataFrame 進行 join。join 的結果也是持續(xù)不斷的生成, 類似于前面的 streaming 的聚合結果。
5.1.?Stream-static Joins
靜態(tài)數據:
lisi,male
zhiling,female
zs,male
流式數據:
lisi,20
zhiling,40
ww,30
5.1.1.?內連接
代碼示例:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. join 等值內連接 a.name=b.nameval joinResult: DataFrame = streamDF.join(staticDF, "name")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
數據輸出:
+-------+---+------+
| name|age| sex|
+-------+---+------+
|zhiling| 40|female|
| lisi| 20| male|
+-------+---+------+
5.1.2.?外連接
代碼示例:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 1. 靜態(tài) dfval arr: Array[(String, String)] = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));val staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")// 2. 流式 dfval lines: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val streamDF: DataFrame = lines.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt)}).toDF("name", "age")// 3. joinval joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")// 4. 輸出joinResult.writeStream.outputMode("append").format("console").start.awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}
數據輸出:
+-------+---+------+
| name|age| sex|
+-------+---+------+
|zhiling| 40|female|
| ww| 30| null|
| lisi| 20| male|
+-------+---+------+
5.2.?Stream-stream Joins
????????在 Spark2.3, 開始支持 stream-stream join。Spark 會自動維護兩個流的狀態(tài), 以保障后續(xù)流入的數據能夠和之前流入的數據發(fā)生 join 操作, 但這會導致狀態(tài)無限增長。 因此, 在對兩個流進行 join 操作時, 依然可以用 watermark 機制來消除過期的狀態(tài), 避免狀態(tài)無限增長。
第 1 個數據格式:姓名,年齡,事件時間
lisi,female,2023-08-09 11:50:00
zs,male,2023-08-09 11:51:00
ww,female,2023-08-09 11:52:00
zhiling,female,2023-08-09 11:53:00
fengjie,female,2023-08-09 11:54:00
yifei,female,2023-08-09 11:55:00
第 2?個數據格式:姓名,年齡,事件時間
lisi,18,2023-08-09 11:50:00
zs,19,2023-08-09 11:51:00
ww,20,2023-08-09 11:52:00
zhiling,22,2023-08-09 11:53:00
yifei,30,2023-08-09 11:54:00
fengjie,98,2023-08-09 11:55:00
5.2.1.?inner join
對 2 個流式數據進行 join 操作,輸出模式僅支持append模式。
不帶 watermast 的 inner join(join 的速度很慢):
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name", "age", "ts2")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}// 數據輸出:
// +-------+------+-------------------+---+-------------------+
// | name| sex| ts1|age| ts2|
// +-------+------+-------------------+---+-------------------+
// |zhiling|female|2023-08-09 11:53:00| 22|2023-08-09 11:53:00|
// | ww|female|2023-08-09 11:52:00| 20|2023-08-09 11:52:00|
// | yifei|female|2023-08-09 11:55:00| 30|2023-08-09 11:54:00|
// | zs| male|2023-08-09 11:51:00| 19|2023-08-09 11:51:00|
// |fengjie|female|2023-08-09 11:54:00| 98|2023-08-09 11:55:00|
// | lisi|female|2023-08-09 11:50:00| 18|2023-08-09 11:50:00|
// +-------+------+-------------------+---+-------------------+
帶 watermast 的 inner join(join 的速度很慢):
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin))// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}// 數據輸出:
// +-------+------+-------------------+-------+---+-------------------+
// | name1| sex| ts1| name2|age| ts2|
// +-------+------+-------------------+-------+---+-------------------+
// |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
// | ww|female|2023-08-09 11:52:00| ww| 20|2023-08-09 11:52:00|
// | zs| male|2023-08-09 11:51:00| zs| 19|2023-08-09 11:51:00|
// |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
// | lisi|female|2023-08-09 11:50:00| lisi| 18|2023-08-09 11:50:00|
// +-------+------+-------------------+-------+---+-------------------+
4.2.2.?outer join
外連接必須使用 watermast,和內連接相比, 代碼幾乎一致, 只需要在連接的時候指定下連接類型即可:joinType = "left"。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}import java.sql.Timestampobject StreamTest {def main(args: Array[String]): Unit = {// 創(chuàng)建 SparkSession. 因為 ss 是基于 spark sql 引擎, 所以需要先創(chuàng)建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 第 1 個 streamval nameSexStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1), Timestamp.valueOf(arr(2)))}).toDF("name1", "sex", "ts1")// 第 2 個 streamval nameAgeStream: DataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load.as[String].map((line: String) => {val arr: Array[String] = line.split(",")(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))}).toDF("name2", "age", "ts2").withWatermark("ts2", "1 minutes")// join 操作val joinResult: DataFrame = nameSexStream.join(nameAgeStream,expr("""|name1=name2 and|ts2 >= ts1 and|ts2 <= ts1 + interval 1 minutes""".stripMargin),joinType = "left")// 數據輸出joinResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()// 關閉執(zhí)行環(huán)境spark.stop()}
}// 數據輸出:
// +-------+------+-------------------+-------+---+-------------------+
// | name1| sex| ts1| name2|age| ts2|
// +-------+------+-------------------+-------+---+-------------------+
// |zhiling|female|2023-08-09 11:53:00|zhiling| 22|2023-08-09 11:53:00|
// | ww|female|2023-08-09 11:52:00| ww| 20|2023-08-09 11:52:00|
// | zs| male|2023-08-09 11:51:00| zs| 19|2023-08-09 11:51:00|
// |fengjie|female|2023-08-09 11:54:00|fengjie| 98|2023-08-09 11:55:00|
// | lisi|female|2023-08-09 11:50:00| lisi| 18|2023-08-09 11:50:00|
// +-------+------+-------------------+-------+---+-------------------+
6.?Streaming DF/DS 不支持的操作
到目前, DF/DS 的有些操作 Streaming DF/DS 還不支持:
- 多個Streaming 聚合(例如在 DF 上的聚合鏈)目前還不支持
- limit 和取前 N 行還不支持
- distinct 也不支持
- 僅僅支持對 complete 模式下的聚合操作進行排序操作
- 僅支持有限的外連接
- 有些方法不能直接用于查詢和返回結果, 因為他們用在流式數據上沒有意義
- count() 不能返回單行數據, 必須是s.groupBy().count()
- foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
- show() 不能直接使用, 而是使用 console sink
如果執(zhí)行上面操作會看到這樣的異常: operation XYZ is not supported with streaming DataFrames/Datasets。
注:其他Spark相關系列文章鏈接由此進 ->??Spark文章匯總?