国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

網(wǎng)站建設中代碼怎么自己做網(wǎng)頁

網(wǎng)站建設中代碼,怎么自己做網(wǎng)頁,杭州做兼職網(wǎng)站建設,網(wǎng)站建設費維護費Flink 系列文章 1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接 13、Flink 的table api與sql的基本概念、通用api介紹及入門示例 14、Flink 的table api與sql之數(shù)據(jù)類型: 內置數(shù)據(jù)類型以及它們的屬性 15、Flink 的ta…

Flink 系列文章

1、Flink 部署、概念介紹、source、transformation、sink使用示例、四大基石介紹和示例等系列綜合文章鏈接

13、Flink 的table api與sql的基本概念、通用api介紹及入門示例
14、Flink 的table api與sql之數(shù)據(jù)類型: 內置數(shù)據(jù)類型以及它們的屬性
15、Flink 的table api與sql之流式概念-詳解的介紹了動態(tài)表、時間屬性配置(如何處理更新結果)、時態(tài)表、流上的join、流上的確定性以及查詢配置
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和語法
19、Flink 的Table API 和 SQL 中的內置函數(shù)及示例(1)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(3)
19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(4)
20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務到集群上

22、Flink 的table api與sql之創(chuàng)建表的DDL
24、Flink 的table api與sql之Catalogs(介紹、類型、java api和sql實現(xiàn)ddl、java api和sql操作catalog)-1
24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫、表)-2
24、Flink 的table api與sql之Catalogs(java api操作視圖)-3
24、Flink 的table api與sql之Catalogs(java api操作分區(qū)與函數(shù))-4

26、Flink 的SQL之概覽與入門示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介紹及詳細示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細示例(2)
27、Flink 的SQL之SELECT (窗口函數(shù))介紹及詳細示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介紹及詳細示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分組聚合、Over Aggregation Over聚合 和 Window Join 窗口關聯(lián))介紹及詳細示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介紹及詳細示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式檢測)介紹及詳細示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 語句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客戶端(通過kafka和filesystem的例子介紹了配置文件使用-表、視圖等)
32、Flink table api和SQL 之用戶自定義 Sources & Sinks實現(xiàn)及詳細示例
41、Flink之Hive 方言介紹及詳細示例
42、Flink 的table api與sql之Hive Catalog
43、Flink之Hive 讀寫及詳細驗證示例
44、Flink之module模塊介紹及使用示例和Flink SQL使用hive內置函數(shù)及自定義函數(shù)詳細示例–網(wǎng)上有些說法好像是錯誤的


文章目錄

  • Flink 系列文章
    • 7、sql clinet中應用自定義函數(shù)
      • 1)、實現(xiàn)自定義函數(shù)
      • 2)、打包并上傳jar至flink的lib目錄下
      • 3)、驗證
        • 1、創(chuàng)建表
        • 2、初始化表數(shù)據(jù)
        • 3、注冊函數(shù)
        • 4、驗證自定義函數(shù)
    • 8、pojo 數(shù)據(jù)類型應用示例-表值函數(shù)


本文展示了自定義函數(shù)在Flink sql client的應用以及自定義函數(shù)中使用pojo的示例。
本文依賴flink、kafka集群能正常使用。
本文分為2個部分,即自定義函數(shù)在Flink sql client中的應用以及自定義函數(shù)中使用pojo數(shù)據(jù)類型。
本文的示例如無特殊說明則是在Flink 1.17版本中運行。

7、sql clinet中應用自定義函數(shù)

本示例將上文中自定義的函數(shù)打包后在flink sql client中進行應用。

1)、實現(xiàn)自定義函數(shù)

本文的所有示例需要依賴的maven見本篇的上一篇:17、Flink 之Table API: Table API 支持的操作(1)
或者引入

    <!-- flink依賴引入--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
  • 示例代碼
package org.table_sql;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;/*** @author alanchan**/@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))
public class Alan_SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] row = str.split(",");collect(Row.of(Integer.valueOf(row[0]), row[1], Integer.valueOf(row[2]), Integer.valueOf(row[3]), row[4]));}}

2)、打包并上傳jar至flink的lib目錄下

將該文件打包成jar文件,特別說明的是,注意flink運行環(huán)境與打包引入的jar文件是否沖突,推薦做法是只打包創(chuàng)建自定義函數(shù)所依賴的jar文件,其他jar使用flink部署環(huán)境的jar。
本示例打包后的文件名:Alan_SplitFunction.jar
上傳jar文件后,并重啟flink集群。

3)、驗證

1、創(chuàng)建表
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.Flink SQL> CREATE TABLE alan_split_table (
>   userString STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_split',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> select * from alan_split_table;
[INFO] Result retrieval cancelled.
2、初始化表數(shù)據(jù)

本示例是通過kafka隊列插入的數(shù)據(jù),前提是kafka環(huán)境好用。

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_split
>"11,alan,18,20,1699341167461"
>"12,alan,19,25,1699341168464"
>"13,alan,20,30,1699341169472"
>"14,alanchan,18,22,1699341170479"
>"15,alanchan,19,25,1699341171482"Flink SQL> select * from alan_split_table;
+----+--------------------------------+
| op |                     userString |
+----+--------------------------------+
| +I |    11,alan,18,20,1699341167461 |
| +I |    12,alan,19,25,1699341168464 |
| +I |    13,alan,20,30,1699341169472 |
| +I | 14,alanchan,18,22,169934117... |
| +I | 15,alanchan,19,25,169934117... |
3、注冊函數(shù)

將自定義的函數(shù)注冊為flink的臨時函數(shù),臨時函數(shù)只在當前的會話中起作用,如果注冊成其他函數(shù),參考如下語法

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION[IF NOT EXISTS] [[catalog_name.]db_name.]function_nameAS identifier [LANGUAGE JAVA|SCALA|PYTHON][USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]# TEMPORARY
# 創(chuàng)建一個有 catalog 和數(shù)據(jù)庫命名空間的臨時 catalog function ,并覆蓋原有的 catalog function 。# TEMPORARY SYSTEM
# 創(chuàng)建一個沒有數(shù)據(jù)庫命名空間的臨時系統(tǒng) catalog function ,并覆蓋系統(tǒng)內置的函數(shù)。

本示例注冊為臨時函數(shù),如下

Flink SQL> CREATE TEMPORARY FUNCTION alan_split AS 'org.table_sql.Alan_SplitFunction';
[INFO] Execute statement succeed.Flink SQL> show functions;
+-----------------------+
|         function name |
+-----------------------+
|                IFNULL |
|      SOURCE_WATERMARK |
|                TYPEOF |
|                   abs |
|                  acos |
|            alan_split |
|                   and |
|                 array |
。。。。。。
4、驗證自定義函數(shù)
Flink SQL> SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime 
> FROM alan_split_table 
> LEFT JOIN LATERAL TABLE(alan_split(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE;
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| op |                     userString |        t_id |                         t_name |       t_age |   t_balance |                      t_rowtime |
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| +I |    11,alan,18,20,1699341167461 |          11 |                           alan |          18 |          20 |                  1699341167461 |
| +I |    12,alan,19,25,1699341168464 |          12 |                           alan |          19 |          25 |                  1699341168464 |
| +I |    13,alan,20,30,1699341169472 |          13 |                           alan |          20 |          30 |                  1699341169472 |
| +I | 14,alanchan,18,22,169934117... |          14 |                       alanchan |          18 |          22 |                  1699341170479 |
| +I | 15,alanchan,19,25,169934117... |          15 |                       alanchan |          19 |          25 |                  1699341171482 |

至此,完成了自定義函數(shù)注冊至flink sql client的驗證。

8、pojo 數(shù)據(jù)類型應用示例-表值函數(shù)

功能參考 19、Flink 的Table API 和 SQL 中的自定義函數(shù)及示例(2) 中的【4、表值函數(shù)-自定義函數(shù)說明及示例】
本示例僅僅是展示在自定義函數(shù)中使用pojo 對象。

本示例僅僅是一種實現(xiàn)方式,也可以覆蓋getTypeInference并以編程方式提供所有組件,不再贅述。

本示例僅僅是以表值函數(shù)作為示例,其他的自定義函數(shù)類似。

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo2 {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private int id;private String name;private int age;private int balance;private String rowtime;}//	@FunctionHint(output = @DataTypeHint("User<id int, name String, age int, balance int, rowtime string>"))
//	public static class OverloadedFunction extends TableFunction<Row> {@FunctionHint(output =@DataTypeHint(bridgedTo = User.class))public static class OverloadedFunction extends TableFunction<User> {public void eval(String str) {String[] user = str.split(",");// 使用 Row數(shù)據(jù)類型
//			collect(Row.of(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));// 使用User pojo數(shù)據(jù)類型collect(new User(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")
//				.select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime"))	;	DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);result5DS.print();
//		15> (true,+I[15, alanchan, 19, 25, 1699341171482])
//		12> (true,+I[12, alan, 19, 25, 1699341168464])
//		13> (true,+I[13, alan, 20, 30, 1699341169472])
//		11> (true,+I[11, alan, 18, 20, 1699341167461])
//		14> (true,+I[14, alanchan, 18, 22, 1699341170479])env.execute();}}

以上,展示了自定義函數(shù)在Flink sql client的應用以及自定義函數(shù)中使用pojo的示例。

http://aloenet.com.cn/news/30775.html

相關文章:

  • 網(wǎng)站設計與管理邯鄲百度推廣公司
  • 哪里有國內網(wǎng)站建設公司淄博網(wǎng)站制作
  • 網(wǎng)站建站公司排名優(yōu)化網(wǎng)站的公司哪家好
  • 日照網(wǎng)站建設千萬別在百度上搜別人名字
  • 杭州灣新區(qū)建設局網(wǎng)站seo優(yōu)化便宜
  • 學校網(wǎng)站建設軟件推薦免費的h5制作網(wǎng)站
  • 建設網(wǎng)站建設什么掙錢互聯(lián)網(wǎng)品牌營銷公司
  • 蘇州高端網(wǎng)站建設解釋seo網(wǎng)站推廣
  • 電腦做會計從業(yè)題目用什么網(wǎng)站最新新聞事件
  • 建設銀行網(wǎng)站個人中心大量微信群推廣代發(fā)廣告
  • axure怎么做網(wǎng)站引流推廣營銷
  • c 網(wǎng)站開發(fā)如何每天10點執(zhí)行任務東莞有哪些做推廣的網(wǎng)站
  • 浙江疫情最新消息2020seo超級外鏈工具免費
  • 馬云早期在政府做網(wǎng)站學電商哪個培訓學校好
  • 日本風格網(wǎng)站seo快速優(yōu)化
  • 哪些企業(yè)網(wǎng)站做得好本地推薦本地推薦
  • 南京建設工程監(jiān)管網(wǎng)站營銷100個引流方案
  • 怎樣做網(wǎng)站服務器亞馬遜關鍵詞搜索工具
  • 做網(wǎng)站要多長時間重慶森林電影簡介
  • 自動寫作文網(wǎng)站建站模板免費下載
  • 如何做招商性網(wǎng)站百度資源搜索平臺
  • 做代購直接網(wǎng)站下單成都關鍵詞優(yōu)化報價
  • c2c電子商務網(wǎng)站建設欄目結構圖最近國際新聞
  • 有沒有好的網(wǎng)站是JSP做的高端網(wǎng)站建設哪個好
  • 江西省興贛建設監(jiān)理咨詢有限公司網(wǎng)站個人網(wǎng)站建設
  • 西寧 網(wǎng)站建設武漢最新疫情
  • dell網(wǎng)站的網(wǎng)站設計特色優(yōu)化營商環(huán)境 提升服務效能
  • 網(wǎng)站建設解決方中國疫情最新數(shù)據(jù)
  • 陽春網(wǎng)站開發(fā)鄭州本地seo顧問
  • 甘肅蘭州做網(wǎng)站企業(yè)qq怎么申請