企業(yè)網(wǎng)站哪里可以做江西seo推廣方案
系列文章目錄
Hudi第一章:編譯安裝
Hudi第二章:集成Spark
Hudi第二章:集成Spark(二)
Hudi第三章:集成Flink
文章目錄
- 系列文章目錄
- 前言
- 一、環(huán)境準備
- 1.上傳并解壓
- 2.修改配置文件
- 3.拷貝jar包
- 4.啟動sql-client
- 1.啟動hadoop
- 2.啟動session
- 3.啟動sql-client
- 二、sql-client編碼
- 1.創(chuàng)建表
- 2.插入數(shù)據(jù)
- 3.查詢數(shù)據(jù)
- 4.更新數(shù)據(jù)
- 5.流式插入
- 三、IDEA編碼
- 1.編寫pom.xml
- 2.編寫demo
- 總結(jié)
前言
之前的兩次博客學習了hudi和spark的集成,現(xiàn)在我們來學習hudi和flink的集成。
一、環(huán)境準備
1.上傳并解壓
2.修改配置文件
vim /opt/module/flink-1.13.6/conf/flink-conf.yaml
直接在最后追加即可。
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop102:8020/ckps
state.backend.incremental: true
sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source /etc/profile.d/my_env.sh
3.拷貝jar包
cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/
4.啟動sql-client
1.啟動hadoop
2.啟動session
/opt/module/flink-1.13.6/bin/yarn-session.sh -d
3.啟動sql-client
bin/sql-client.sh embedded -s yarn-session
啟動成功后可以在web端看一下。
也可以跳轉(zhuǎn)到flink的webui。
現(xiàn)在我們就可以在終端寫代碼了。
二、sql-client編碼
1.創(chuàng)建表
CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ'
);
2.插入數(shù)據(jù)
INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
3.查詢數(shù)據(jù)
我們先更改一下表格式,默認的看得可能不習慣。
set sql-client.execution.result-mode=tableau;
select * from t1;
4.更新數(shù)據(jù)
前面說過hudi的更新操作就是插入一條主鍵相同的新數(shù)據(jù),由更新的ts來覆蓋舊的。
insert into t1 values('id1','Danny',27,TIMESTAMP '1970-01-02 00:00:01','par1');
可以看到數(shù)據(jù)已經(jīng)完成了更新。
5.流式插入
flink最常用的還是流式數(shù)據(jù)的處理。
CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
) WITH ('connector' = 'datagen','rows-per-second' = '1'
);create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
)
with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ'
);
我們創(chuàng)建兩張表,第一張的連接器是datagen可以用來流式的生產(chǎn)數(shù)據(jù)。第二張表是正常的hudi表。
insert into t2 select * from sourceT;
我們可以在webui看一下。
因為是流式處理,所以這個進程是不會停止的。
select * from t2 limit 10;
再查看一次
我們會發(fā)現(xiàn)是不斷有數(shù)據(jù)產(chǎn)生。
三、IDEA編碼
我們需要將編譯好的一個包拉到本地。
然后將他倒入maven倉庫
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar
1.編寫pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.hudi</groupId><artifactId>flink-hudi-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不參與運行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea運行時也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手動install到本地maven倉庫--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
2.編寫demo
HudiDemo.java
一個簡單的流式數(shù)據(jù)處理和剛剛一樣。
package com.atguigu.hudi.flink;import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME", "atguigu");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());// 設置狀態(tài)后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setDbStoragePath("/home/chaoge/Downloads/hudi");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);sTableEnv.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");sTableEnv.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hadoop102:8020/tmp/hudi_idea/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");sTableEnv.executeSql("insert into t2 select * from sourceT");}
}
當我們運行的時候,可以再本地webui查看。
127.0.0.1:8081/
也可以在hdfs路徑看一下。
總結(jié)
flink第一次就先寫到這里剩下的還要在寫一次。