国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

建站系統(tǒng)做的網(wǎng)站百度可以搜索到嗎網(wǎng)絡(luò)營銷策劃方案書范文

建站系統(tǒng)做的網(wǎng)站百度可以搜索到嗎,網(wǎng)絡(luò)營銷策劃方案書范文,開源網(wǎng)站建設(shè)實習(xí)心得,南寧手機網(wǎng)站開發(fā)策劃Flink 系列文章 1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例 14、Flink 的table api與sql之數(shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性 15、Flink 的ta…

Flink 系列文章

1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接

13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之數(shù)據(jù)類型: 內(nèi)置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動態(tài)表、時間屬性配置(如何處理更新結(jié)果)、時態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語法
19、Flink 的Table API 和 SQL 中的內(nèi)置函數(shù)及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4

26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關(guān)聯(lián))介紹及詳細示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測)介紹及詳細示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實現(xiàn)及詳細示例
41、Flink之Hive 方言介紹及詳細示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細驗證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內(nèi)置函數(shù)及自定義函數(shù)詳細示例–網(wǎng)上有些說法好像是錯誤的


文章目錄

  • Flink 系列文章
  • 三、自定義函數(shù)
    • 1、概述
    • 2、開發(fā)指南
      • 1)、函數(shù)類
      • 2)、求值方法
      • 3)、類型推導(dǎo)
        • 1、自動類型推導(dǎo)
        • 2、定制類型推導(dǎo)
      • 4)、確定性
        • 1、內(nèi)置函數(shù)的確定性
      • 5)、運行時集成
    • 3、標量函數(shù)-自定義函數(shù)說明及示例
    • 4、表值函數(shù)-自定義函數(shù)說明及示例


本文介紹了flink的自定義函數(shù)概述、開發(fā)指南以及標量函數(shù)、表值函數(shù)的自定義函數(shù)實現(xiàn)及說明,提供的示例均可運行并提供運行結(jié)果供參考。
本文依賴flink集群能正常使用。
本文分為4個部分,即自定義函數(shù)的概述、開發(fā)指南、標量自定義函數(shù)的說明及示例、表值自定義函數(shù)的說明及示例。
本文的示例均在Flink 1.17版本中運行。

三、自定義函數(shù)

自定義函數(shù)(UDF)是一種擴展開發(fā)機制,可以用來在查詢語句里調(diào)用難以用其他方式表達的頻繁使用或自定義的邏輯。

自定義函數(shù)可以用 JVM 語言(例如 Java 或 Scala)或 Python 實現(xiàn),實現(xiàn)者可以在 UDF 中使用任意第三方庫,本文聚焦于使用 JVM 語言開發(fā)自定義函數(shù)。

1、概述

當前 Flink 有如下幾種函數(shù):

  • 標量函數(shù),將標量值轉(zhuǎn)換成一個新標量值;
  • 表值函數(shù),將標量值轉(zhuǎn)換成新的行數(shù)據(jù);
  • 聚合函數(shù),將多行數(shù)據(jù)里的標量值轉(zhuǎn)換成一個新標量值;
  • 表值聚合函數(shù),將多行數(shù)據(jù)里的標量值轉(zhuǎn)換成新的行數(shù)據(jù);
  • 異步表值函數(shù),是異步查詢外部數(shù)據(jù)系統(tǒng)的特殊函數(shù)。

標量和表值函數(shù)已經(jīng)使用了新的基于數(shù)據(jù)類型的類型系統(tǒng),聚合函數(shù)仍然使用基于 TypeInformation 的舊類型系統(tǒng)。

2、開發(fā)指南

在聚合函數(shù)使用新的類型系統(tǒng)前,本節(jié)僅適用于標量和表值函數(shù)。

所有的自定義函數(shù)都遵循一些基本的實現(xiàn)原則。

1)、函數(shù)類

實現(xiàn)類必須繼承自合適的基類之一(例如 org.apache.flink.table.functions.ScalarFunction )。

該類必須聲明為 public ,而不是 abstract ,并且可以被全局訪問。不允許使用非靜態(tài)內(nèi)部類或匿名類。

為了將自定義函數(shù)存儲在持久化的 catalog 中,該類必須具有默認構(gòu)造器,且在運行時可實例化。

Table API 中的匿名函數(shù)只有在函數(shù)不是有狀態(tài)的(stateful)(即僅包含瞬態(tài)和靜態(tài)(transient and static)字段)時才能持久化。

2)、求值方法

基類提供了一組可以被重寫的方法,例如 open()、 close() 或 isDeterministic() 。

但是,除了上述方法之外,作用于每條傳入記錄的主要邏輯還必須通過專門的 求值方法 來實現(xiàn)。

根據(jù)函數(shù)的種類,后臺生成的運算符會在運行時調(diào)用諸如 eval()、accumulate() 或 retract() 之類的求值方法。

這些方法必須聲明為 public ,并帶有一組定義明確的參數(shù)。

常規(guī)的 JVM 方法調(diào)用語義是適用的。因此可以:

  • 實現(xiàn)重載的方法,例如 eval(Integer) 和 eval(LocalDateTime);
  • 使用變長參數(shù),例如 eval(Integer…);
  • 使用對象繼承,例如 eval(Object) 可接受 LocalDateTime 和 Integer 作為參數(shù);
  • 也可組合使用,例如 eval(Object…) 可接受所有類型的參數(shù)。

示例片段

import org.apache.flink.table.functions.ScalarFunction;// 有多個重載求值方法的函數(shù)
public static class SumFunction extends ScalarFunction {//兩Integer數(shù)求和public Integer eval(Integer a, Integer b) {return a + b;}//兩String數(shù)轉(zhuǎn)換后求和public Integer eval(String a, String b) {return Integer.valueOf(a) + Integer.valueOf(b);}//多Double數(shù)據(jù)求和public Integer eval(Double... d) {double result = 0;for (double value : d)result += value;return (int) result;}
}

3)、類型推導(dǎo)

Table(類似于 SQL 標準)是一種強類型的 API。因此,函數(shù)的參數(shù)和返回類型都必須映射到數(shù)據(jù)類型。

從邏輯角度看,Planner 需要知道數(shù)據(jù)類型、精度和小數(shù)位數(shù);從 JVM 角度來看,Planner 在調(diào)用自定義函數(shù)時需要知道如何將內(nèi)部數(shù)據(jù)結(jié)構(gòu)表示為 JVM 對象。

術(shù)語 類型推導(dǎo) 概括了意在驗證輸入值、派生出參數(shù)/返回值數(shù)據(jù)類型的邏輯。

Flink 自定義函數(shù)實現(xiàn)了自動的類型推導(dǎo)提取,通過反射從函數(shù)的類及其求值方法中派生數(shù)據(jù)類型。如果這種隱式的反射提取方法不成功,則可以通過使用 @DataTypeHint 和 @FunctionHint 注解相關(guān)參數(shù)、類或方法來支持提取過程,下面展示了有關(guān)如何注解函數(shù)的例子。

如果需要更高級的類型推導(dǎo)邏輯,實現(xiàn)者可以在每個自定義函數(shù)中顯式重寫 getTypeInference() 方法。但是,建議使用注解方式,因為它可使自定義類型推導(dǎo)邏輯保持在受影響位置附近,而在其他位置則保持默認狀態(tài)。

1、自動類型推導(dǎo)

自動類型推導(dǎo)會檢查函數(shù)的類和求值方法,派生出函數(shù)參數(shù)和結(jié)果的數(shù)據(jù)類型, @DataTypeHint 和 @FunctionHint 注解支持自動類型推導(dǎo)。

有關(guān)可以隱式映射到數(shù)據(jù)類型的類的完整列表,請參閱數(shù)據(jù)類型。

  • @DataTypeHint

在許多情況下,需要支持以 內(nèi)聯(lián) 方式自動提取出函數(shù)參數(shù)、返回值的類型。

以下例子展示了如何使用 @DataTypeHint,詳情可參考該注解類的文檔。

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;// 有多個重載求值方法的函數(shù)
public static class OverloadedFunction extends ScalarFunction {// no hint requiredpublic Long eval(long a, long b) {return a + b;}// 定義 decimal 的精度和小數(shù)位public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {return BigDecimal.valueOf(a + b);}// 定義嵌套數(shù)據(jù)類型@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")public Row eval(int i) {return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));}// 允許任意類型的符入,并輸出序列化定制后的值@DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return MyUtils.serializeToByteBuffer(o);}
}
  • @FunctionHint

有時我們希望一種求值方法可以同時處理多種數(shù)據(jù)類型,有時又要求對重載的多個求值方法僅聲明一次通用的結(jié)果類型。

@FunctionHint 注解可以提供從入?yún)?shù)據(jù)類型到結(jié)果數(shù)據(jù)類型的映射,它可以在整個函數(shù)類或求值方法上注解輸入、累加器和結(jié)果的數(shù)據(jù)類型。可以在類頂部聲明一個或多個注解,也可以為類的所有求值方法分別聲明一個或多個注解。所有的 hint 參數(shù)都是可選的,如果未定義參數(shù),則使用默認的基于反射的類型提取。在函數(shù)類頂部定義的 hint 參數(shù)被所有求值方法繼承。

以下例子展示了如何使用 @FunctionHint,詳情可參考該注解類的文檔。

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;// 為函數(shù)類的所有求值方法指定同一個輸出類型
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {public void eval(int a, int b) {collect(Row.of("Sum", a + b));}// overloading of arguments is still possiblepublic void eval() {collect(Row.of("Empty args", -1));}
}// 解耦類型推導(dǎo)與求值方法,類型推導(dǎo)完全取決于 FunctionHint
@FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")},output = @DataTypeHint("INT")
)
@FunctionHint(input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},output = @DataTypeHint("BIGINT")
)
@FunctionHint(input = {},output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {// an implementer just needs to make sure that a method exists that can be called by the JVMpublic void eval(Object... o) {if (o.length == 0) {collect(false);}collect(o[0]);}
}
2、定制類型推導(dǎo)

在大多數(shù)情況下,@DataTypeHint 和 @FunctionHint 足以構(gòu)建自定義函數(shù),然而通過重寫 getTypeInference() 定制自動類型推導(dǎo)邏輯,實現(xiàn)者可以創(chuàng)建任意像系統(tǒng)內(nèi)置函數(shù)那樣有用的函數(shù)。

以下用 Java 實現(xiàn)的例子展示了定制類型推導(dǎo)的潛力,它根據(jù)字符串參數(shù)來確定函數(shù)的結(jié)果類型。該函數(shù)帶有兩個字符串參數(shù):第一個參數(shù)表示要分析的字符串,第二個參數(shù)表示目標類型。

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;public static class LiteralFunction extends ScalarFunction {public Object eval(String s, String type) {switch (type) {case "INT":return Integer.valueOf(s);case "DOUBLE":return Double.valueOf(s);case "STRING":default:return s;}}// 禁用自動的反射式類型推導(dǎo),使用如下邏輯進行類型推導(dǎo)@Overridepublic TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder()// 指定輸入?yún)?shù)的類型,必要時參數(shù)會被隱式轉(zhuǎn)換.typedArguments(DataTypes.STRING(), DataTypes.STRING())// specify a strategy for the result data type of the function.outputTypeStrategy(callContext -> {if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {throw callContext.newValidationError("Literal expected for second argument.");}// 基于字符串值返回數(shù)據(jù)類型final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");switch (literal) {case "INT":return Optional.of(DataTypes.INT().notNull());case "DOUBLE":return Optional.of(DataTypes.DOUBLE().notNull());case "STRING":default:return Optional.of(DataTypes.STRING());}}).build();}
}

4)、確定性

每個用戶自定義函數(shù)類都可以通過重寫 isDeterministic() 方法來聲明它是否產(chǎn)生確定性的結(jié)果。如果該函數(shù)不是純粹函數(shù)式的(如random(), date(), 或now()),該方法必須返回 false。默認情況下,isDeterministic() 返回 true。

此外,重寫 isDeterministic() 方法也可能影響運行時行為。運行時實現(xiàn)可能會在兩個不同的階段被調(diào)用:

  • 在生成執(zhí)行計劃期間:如果一個函數(shù)是通過常量表達式調(diào)用的或者常量表達式可以從給定的語句中推導(dǎo)出來,那么一個函數(shù)就會被預(yù)計算以減少常量表達式,并且可能不再在集群上執(zhí)行。 除非 isDeterministic() 被重寫為 false 用來在這種情況下禁用常量表達式簡化。比如說,以下對 ABS 的調(diào)用在生成執(zhí)行計劃期間被執(zhí)行:SELECT ABS(-1) FROM t 和 SELECT ABS(field) FROM t WHERE field = -1,而 SELECT ABS(field) FROM t 則不執(zhí)行。

  • 在運行時(即在集群執(zhí)行):如果一個函數(shù)被調(diào)用時帶有非常量表達式或 isDeterministic() 返回 false。

1、內(nèi)置函數(shù)的確定性

系統(tǒng)(內(nèi)置)函數(shù)的確定性是不可改變的。存在兩種不具有確定性的函數(shù):動態(tài)函數(shù)和非確定性函數(shù),根據(jù) Apache Calcite SqlOperator 的定義:

  /*** Returns whether a call to this operator is guaranteed to always return* the same result given the same operands; true is assumed by default.*/public boolean isDeterministic() {return true;}/*** Returns whether it is unsafe to cache query plans referencing this* operator; false is assumed by default.*/public boolean isDynamicFunction() {return false;}

isDeterministic 表示函數(shù)的確定性,聲明返回 false 時將在運行時對每個記錄進行計算。
isDynamicFunction 聲明返回 true 時意味著該函數(shù)只能在查詢開始時被計算,對于批處理模式,它只在生成執(zhí)行計劃期間被執(zhí)行, 而對于流模式,它等效于一個非確定性的函數(shù),這是因為查詢在邏輯上是連續(xù)執(zhí)行的(流模式對動態(tài)表的連續(xù)查詢抽象),所以動態(tài)函數(shù)在每次查詢執(zhí)行時也會被重新計算(當前實現(xiàn)下等效于每條記錄計算)。

以下內(nèi)置函數(shù)總是非確定性的(批和流模式下,都在運行時對每條記錄進行計算)

  • UUID
  • RAND
  • RAND_INTEGER
  • CURRENT_DATABASE
  • UNIX_TIMESTAMP
  • CURRENT_ROW_TIMESTAMP

以下內(nèi)置時間函數(shù)是動態(tài)的,批處理模式下,將在生成執(zhí)行計劃期間被執(zhí)行(查詢開始),對于流模式,將在運行時對每條記錄進行計算

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW
  • LOCALTIME
  • LOCALTIMESTAMP

isDynamicFunction 僅適用于內(nèi)置函數(shù)

5)、運行時集成

有時候自定義函數(shù)需要獲取一些全局信息,或者在真正被調(diào)用之前做一些配置(setup)/清理(clean-up)的工作。自定義函數(shù)也提供了 open() 和 close() 方法,你可以重寫這兩個方法做到類似于 DataStream API 中 RichFunction 的功能。

open() 方法在求值方法被調(diào)用之前先調(diào)用。close() 方法在求值方法調(diào)用完之后被調(diào)用。

open() 方法提供了一個 FunctionContext,它包含了一些自定義函數(shù)被執(zhí)行時的上下文信息,比如 metric group、分布式文件緩存,或者是全局的作業(yè)參數(shù)等。

下面的信息可以通過調(diào)用 FunctionContext 的對應(yīng)的方法來獲得:

方法描述
getMetricGroup()執(zhí)行該函數(shù)的 subtask 的 Metric Group。
getCachedFile(name)分布式文件緩存的本地臨時文件副本。
getJobParameter(name, defaultValue)跟對應(yīng)的 key 關(guān)聯(lián)的全局參數(shù)值。

下面的例子展示了如何在一個標量函數(shù)中通過 FunctionContext 來獲取一個全局的任務(wù)參數(shù):

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;public static class HashCodeFunction extends ScalarFunction {private int factor = 0;@Overridepublic void open(FunctionContext context) throws Exception {// 獲取參數(shù) "hashcode_factor"// 如果不存在,則使用默認值 "12"factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));}public int eval(String s) {return s.hashCode() * factor;}
}TableEnvironment env = TableEnvironment.create(...);// 設(shè)置任務(wù)參數(shù)
env.getConfig().addJobParameter("hashcode_factor", "31");// 注冊函數(shù)
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);// 調(diào)用函數(shù)
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");

3、標量函數(shù)-自定義函數(shù)說明及示例

自定義標量函數(shù)可以把 0 到多個標量值映射成 1 個標量值,數(shù)據(jù)類型里列出的任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型。

想要實現(xiàn)自定義標量函數(shù),你需要擴展 org.apache.flink.table.functions 里面的 ScalarFunction 并且實現(xiàn)一個或者多個求值方法。標量函數(shù)的行為取決于你寫的求值方法。

求值方法必須是 public 的,而且名字必須是 eval。

下面自定義函數(shù)是將balance加上(萬元)以及求balance/age,僅僅示例如何使用,其運行結(jié)果在每次輸出的代碼后面注釋的行。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDScalarFunctionDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan", 18, 20,1698742358391L), new User(2L, "alan", 19, 25,1698742359396L), new User(3L, "alan", 25, 30,1698742360407L),new User(4L, "alanchan", 28,35, 1698742361409L), new User(5L, "alanchan", 29, 35,1698742362424L));public static class TestScalarFunction extends ScalarFunction {// 接受任意類型輸入,返回 String 型輸出public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.toString() + " (萬元)";}public double eval(Integer  age, Integer  balance) {return balance / age *1.0;}}/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"),$("balance"), $("rowtime"));//1、 在 Table API 里不經(jīng)注冊直接“內(nèi)聯(lián)”調(diào)用函數(shù)Table result = usersTable.select($("id"), $("name"), call(TestScalarFunction.class, $("balance")));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
//		resultDS.print();
//		11> (true,+I[2, alan, 25 (萬元)])
//		12> (true,+I[3, alan, 30 (萬元)])
//		13> (true,+I[4, alanchan, 35 (萬元)])
//		10> (true,+I[1, alan, 20 (萬元)])
//		14> (true,+I[5, alanchan, 35 (萬元)])Table result2 = usersTable.select($("id"), $("name"), $("age"), call(TestScalarFunction.class, $("balance")), call(TestScalarFunction.class, $("age"), $("balance")));DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
//		result2DS.print();
//		9> (true,+I[2, alan, 19, 25 (萬元), 1.0])
//		10> (true,+I[3, alan, 25, 30 (萬元), 1.0])
//		12> (true,+I[5, alanchan, 29, 35 (萬元), 1.0])
//		11> (true,+I[4, alanchan, 28, 35 (萬元), 1.0])
//		8> (true,+I[1, alan, 18, 20 (萬元), 1.0])//2、 注冊函數(shù)tenv.createTemporarySystemFunction("TestScalarFunction", TestScalarFunction.class);// 在 Table API 里調(diào)用注冊好的函數(shù)Table result3 = usersTable.select($("id"), $("name"),call("TestScalarFunction", $("balance")));DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result3, Row.class);
//		result3DS.print();
//		2> (true,+I[4, alanchan, 35 (萬元)])
//		3> (true,+I[5, alanchan, 35 (萬元)])
//		15> (true,+I[1, alan, 20 (萬元)])
//		16> (true,+I[2, alan, 25 (萬元)])
//		1> (true,+I[3, alan, 30 (萬元)])// 在 SQL 里調(diào)用注冊好的函數(shù)tenv.createTemporaryView("user_view", users);Table result4 = tenv.sqlQuery("SELECT id,name,TestScalarFunction(balance) ,TestScalarFunction(age,balance) FROM user_view");DataStream<Tuple2<Boolean, Row>> result4DS = tenv.toRetractStream(result4, Row.class);result4DS.print();
//		14> (true,+I[1, alan, 20 (萬元), 1.0])
//		1> (true,+I[4, alanchan, 35 (萬元), 1.0])
//		2> (true,+I[5, alanchan, 35 (萬元), 1.0])
//		15> (true,+I[2, alan, 25 (萬元), 1.0])
//		16> (true,+I[3, alan, 30 (萬元), 1.0])env.execute();}}

4、表值函數(shù)-自定義函數(shù)說明及示例

跟自定義標量函數(shù)一樣,自定義表值函數(shù)的輸入?yún)?shù)也可以是 0 到多個標量。但是跟標量函數(shù)只能返回一個值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果輸出行只包含 1 列,會省略結(jié)構(gòu)化信息并生成標量值,這個標量值在運行階段會隱式地包裝進行里。

要定義一個表值函數(shù),你需要擴展 org.apache.flink.table.functions 下的 TableFunction,可以通過實現(xiàn)多個名為 eval 的方法對求值方法進行重載。像其他函數(shù)一樣,輸入和輸出類型也可以通過反射自動提取出來。表值函數(shù)返回的表的類型取決于 TableFunction 類的泛型參數(shù) T,不同于標量函數(shù),表值函數(shù)的求值方法本身不包含返回類型,而是通過 collect(T) 方法來發(fā)送要輸出的行。

在 Table API 中,表值函數(shù)是通過 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 來使用的。joinLateral 算子會把外表(算子左側(cè)的表)的每一行跟跟表值函數(shù)返回的所有行(位于算子右側(cè))進行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左側(cè)的表)的每一行跟表值函數(shù)返回的所有行(位于算子右側(cè))進行(cross)join,并且如果表值函數(shù)返回 0 行也會保留外表的這一行。

在 SQL 里面用 JOIN 或者 以 ON TRUE 為條件的 LEFT JOIN 來配合 LATERAL TABLE() 的使用。

下面示例中包含表值函數(shù)的四種應(yīng)用方式。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static List<User> userList = Arrays.asList(new User(1L, "alan,chen", 18, 20,1698742358391L), new User(2L, "alan,chen", 19, 25,1698742359396L), new User(3L, "alan,chen", 25, 30,1698742360407L),new User(4L, "alan,chan", 28,35, 1698742361409L), new User(5L, "alan,chan", 29, 35,1698742362424L));@FunctionHint(output = @DataTypeHint("ROW<firstName STRING, lastName String>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] names = str.split(",");collect(Row.of(names[0],names[1]));
//			for (String s : str.split(", ")) {
//				// use collect(...) to emit a row
//				collect(Row.of(s, s.length()));
//			}}}@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))public static class OverloadedFunction extends TableFunction<Row> {public void eval(String str) {String[] user = str.split(",");collect(Row.of(Integer.valueOf(user[0]),user[1],Integer.valueOf(user[2]),Integer.valueOf(user[3]),user[4]));}}/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<User> users = env.fromCollection(userList);Table usersTable = tenv.fromDataStream(users, $("id"), $("name"), $("age"), $("balance"), $("rowtime"));// 1、 在 Table API 里不經(jīng)注冊直接“內(nèi)聯(lián)”調(diào)用函數(shù)Table result = usersTable.joinLateral(call(SplitFunction.class, $("name"))).select($("id"), $("name"),$("firstName"),$("lastName"));DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(result, Row.class);
//		resultDS.print();
//		11> (true,+I[5, alan,chan, alan, chan])
//		7> (true,+I[1, alan,chen, alan, chen])
//		9> (true,+I[3, alan,chen, alan, chen])
//		10> (true,+I[4, alan,chan, alan, chan])
//		8> (true,+I[2, alan,chen, alan, chen])DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));Table result2 = usersTable2.joinLateral(call(OverloadedFunction.class, $("userString"))).select($("userString"),$("id"),$("name"),$("age"),$("balance"),$("rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result2DS = tenv.toRetractStream(result2, Row.class);
//		result2DS.print();
//		15> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//		13> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//		14> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//		11> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//		12> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result3 = usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $("userString"))).select($("userString"),$("id"),$("name"),$("age"),$("balance"),$("rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result3DS = tenv.toRetractStream(result3, Row.class);
//		result3DS.print();
//		5> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//		6> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//		3> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//		4> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
//		7> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])// 在 Table API 里重命名函數(shù)字段Table result4 = usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result4DS = tenv.toRetractStream(result4, Row.class);
//		result4DS.print();
//		10> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//		13> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//		14> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//		12> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//		11> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//2、 注冊函數(shù)tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);// 在 Table API 里調(diào)用注冊好的函數(shù)Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);
//		result5DS.print();
//		11> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//		14> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//		15> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//		13> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//		12> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result6 = usersTable2.joinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result6DS = tenv.toRetractStream(result6, Row.class);
//		result6DS.print();
//		8> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//		9> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//		5> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//		7> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//		6> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//3、 在 SQL 里調(diào)用注冊好的函數(shù)tenv.createTemporaryView("user_view", usersTable2);Table result7 =  tenv.sqlQuery("SELECT userString, id,name,age,balance,rowtime " +"FROM user_view, LATERAL TABLE(OverloadedFunction(userString))");DataStream<Tuple2<Boolean, Row>> result7DS = tenv.toRetractStream(result7, Row.class);
//			result7DS.print();
//			15> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//			13> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//			1> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//			14> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
//			16> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])Table result8 =  tenv.sqlQuery("SELECT userString, id,name,age,balance,rowtime " +"FROM user_view "+" LEFT JOIN LATERAL TABLE( OverloadedFunction(userString)) ON TRUE  "  );DataStream<Tuple2<Boolean, Row>> result8DS = tenv.toRetractStream(result8, Row.class);
//				result8DS.print();
//				13> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
//				1> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//				15> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//				14> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
//				16> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])//4、 在 SQL 里重命名函數(shù)字段Table result9 =  tenv.sqlQuery("SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime " +"FROM user_view "+"LEFT JOIN LATERAL TABLE(OverloadedFunction(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE");DataStream<Tuple2<Boolean, Row>> result9DS = tenv.toRetractStream(result9, Row.class);result9DS.print();
//					7> (true,+I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
//					10> (true,+I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
//					9> (true,+I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
//					8> (true,+I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
//					6> (true,+I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])env.execute();}}

以上,介紹了flink的自定義函數(shù)概述、開發(fā)指南以及標量函數(shù)、表值函數(shù)的自定義函數(shù)實現(xiàn)及說明,提供的示例均可運行并提供運行結(jié)果供參考。

http://aloenet.com.cn/news/36230.html

相關(guān)文章:

  • 網(wǎng)站備案怎么弄新鄉(xiāng)百度關(guān)鍵詞優(yōu)化外包
  • 免費推廣的軟件做網(wǎng)站優(yōu)化推廣
  • 查找企業(yè)信息的網(wǎng)站青島網(wǎng)站制作推廣
  • 農(nóng)資銷售網(wǎng)站建設(shè)方案sem百度競價推廣
  • 蘇州網(wǎng)站建設(shè)公司電話深度搜索
  • 湖南網(wǎng)站建設(shè)小公司搜索引擎的工作原理有哪些
  • ip網(wǎng)站架設(shè)上海推廣外包
  • wordpress主題開發(fā)404頁面網(wǎng)站優(yōu)化課程培訓(xùn)
  • 湖南做網(wǎng)站 x磐石網(wǎng)絡(luò)關(guān)鍵詞如何快速排名
  • 上蔡做網(wǎng)站企業(yè)如何進行網(wǎng)絡(luò)推廣
  • 注冊完域名后如何做網(wǎng)站快速seo軟件
  • ps做網(wǎng)站首頁的尺寸渠道策略的四種方式
  • 酒店設(shè)計網(wǎng)站推薦百度學(xué)術(shù)搜索入口
  • 設(shè)計網(wǎng)站怎樣做色卡百度快照排名
  • 電子商務(wù)網(wǎng)站建設(shè)特點網(wǎng)絡(luò)營銷策劃方案范文
  • wordpress 視頻不播放seo標題優(yōu)化的心得總結(jié)
  • 網(wǎng)站建設(shè)安全級別自媒體平臺收益排行榜
  • 宜賓做網(wǎng)站公司營銷策略都有哪些
  • 用html5做的美食網(wǎng)站百度明星人氣榜入口
  • 網(wǎng)站建設(shè)服務(wù)百度com百度一下你
  • 免費網(wǎng)站建設(shè)哪個好 - 百度百度新聞排行榜
  • 竹子建站怎么樣上海優(yōu)化網(wǎng)站seo公司
  • 攜程電子商務(wù)網(wǎng)站建設(shè)微信營銷的方法
  • wordpress建站如何制作微信seo顧問能賺錢嗎
  • 營銷網(wǎng)站搭建百度指數(shù)是搜索量嗎
  • 建設(shè)銀行網(wǎng)站是多少南京百度網(wǎng)站快速優(yōu)化
  • 互聯(lián)網(wǎng)公司網(wǎng)站模板百度極速版app下載安裝掙錢
  • 藍牙 技術(shù)支持 東莞網(wǎng)站建設(shè)安卓優(yōu)化大師官方版
  • 佛山做網(wǎng)站哪家公司最好seo需要培訓(xùn)才能找到工作嗎
  • 做視頻鏈接網(wǎng)站seo網(wǎng)址大全