国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

新鄉(xiāng)網(wǎng)站推廣公司微信運(yùn)營(yíng)

新鄉(xiāng)網(wǎng)站推廣公司,微信運(yùn)營(yíng),繁體版 企業(yè)網(wǎng)站,服務(wù)器的作用和用途1.CDC概述 CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)行進(jìn)一步的處理和分析。 傳統(tǒng)上,數(shù)據(jù)源的變化通常通過(guò)…

1.CDC概述


CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)行進(jìn)一步的處理和分析。

傳統(tǒng)上,數(shù)據(jù)源的變化通常通過(guò)周期性地輪詢整個(gè)數(shù)據(jù)集進(jìn)行檢查來(lái)實(shí)現(xiàn)。但是,這種輪詢的方式效率低下且不能實(shí)時(shí)反應(yīng)變化。而 CDC 技術(shù)則通過(guò)在數(shù)據(jù)源上設(shè)置一種機(jī)制,使得變化的數(shù)據(jù)可以被實(shí)時(shí)捕獲并傳遞給下游處理系統(tǒng),從而實(shí)現(xiàn)了實(shí)時(shí)的數(shù)據(jù)變動(dòng)監(jiān)控。

Flink 作為一個(gè)強(qiáng)大的流式計(jì)算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫(kù)、消息隊(duì)列等),捕獲其中的數(shù)據(jù)變化,并進(jìn)行靈活的實(shí)時(shí)處理和分析。

通過(guò)使用 Flink CDC,我們可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。

Flink_CDC

?

2.CDC 的實(shí)現(xiàn)原理


通常來(lái)講,CDC 分為主動(dòng)查詢和事件接收兩種技術(shù)實(shí)現(xiàn)模式。對(duì)于主動(dòng)查詢而言,用戶通常會(huì)在數(shù)據(jù)源表的某個(gè)字段中,保存上次更新的時(shí)間戳或版本號(hào)等信息,然后下游通過(guò)不斷的查詢和與上次的記錄做對(duì)比,來(lái)確定數(shù)據(jù)是否有變動(dòng),是否需要同步。這種方式優(yōu)點(diǎn)是不涉及數(shù)據(jù)庫(kù)底層特性,實(shí)現(xiàn)比較通用;缺點(diǎn)是要對(duì)業(yè)務(wù)表做改造,且實(shí)時(shí)性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對(duì)數(shù)據(jù)庫(kù)的壓力較大。事件接收模式可以通過(guò)觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來(lái)實(shí)現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動(dòng)時(shí),會(huì)通過(guò)附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來(lái)。下游可以通過(guò)數(shù)據(jù)庫(kù)底層的協(xié)議,訂閱并消費(fèi)這些事件,然后對(duì)數(shù)據(jù)庫(kù)變動(dòng)記錄做重放,從而實(shí)現(xiàn)同步。這種方式的優(yōu)點(diǎn)是實(shí)時(shí)性高,可以精確捕捉上游的各種變動(dòng);缺點(diǎn)是部署數(shù)據(jù)庫(kù)的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運(yùn)維成本,對(duì)一些冷門的數(shù)據(jù)庫(kù)支持不夠。綜合來(lái)看,事件接收模式整體在實(shí)時(shí)性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見(jiàn)的數(shù)據(jù)庫(kù)實(shí)現(xiàn),建議使用Debezium來(lái)實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來(lái)自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。
在這里插入圖片描述

?

3.為什么選 Flink
從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過(guò) Kafka Streams 直接實(shí)現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因?yàn)?Flink 相對(duì) Kafka Streams 而言,有如下優(yōu)勢(shì):

強(qiáng)大的流處理引擎: Flink 是一個(gè)強(qiáng)大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語(yǔ)義等特性。它通過(guò)基于事件時(shí)間的處理模型,支持準(zhǔn)確和有序的數(shù)據(jù)處理,適用于實(shí)時(shí)數(shù)據(jù)處理和分析場(chǎng)景。這使得 Flink 成為實(shí)現(xiàn) CDC 的理想選擇。

內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進(jìn)行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實(shí)現(xiàn) CDC 變得更加簡(jiǎn)單和高效。

多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進(jìn)行集成,如關(guān)系型數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL)、消息隊(duì)列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無(wú)論你的數(shù)據(jù)存儲(chǔ)在哪里,Flink 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進(jìn)行進(jìn)一步的實(shí)時(shí)處理和分析。

靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強(qiáng)大的數(shù)據(jù)處理能力,可以通過(guò)編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來(lái)對(duì) CDC 數(shù)據(jù)進(jìn)行各種實(shí)時(shí)計(jì)算和分析。同時(shí),Flink 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語(yǔ)句或 Table API 進(jìn)行簡(jiǎn)單查詢和分析的方式。

完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,Flink 還與其他流行的開源項(xiàng)目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。

4.支持的連接器

?5.支持的 Flink 版本

?

6.Flink CDC特性
支持讀取數(shù)據(jù)庫(kù)快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進(jìn)行Exactly-once處理
DataStream API 的 CDC 連接器,用戶可以在單個(gè)作業(yè)中使用多個(gè)數(shù)據(jù)庫(kù)和表的更改,而無(wú)需部署 Debezium 和 Kafka
Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來(lái)監(jiān)視單個(gè)表上的更改
下表顯示了連接器的當(dāng)前特性:

?

7.用法實(shí)例

7.1DataStream API 的用法(推薦)

請(qǐng)嚴(yán)格按照上面的《5.支持的 Flink 版本》搭配來(lái)使用Flink CDC

<properties><flink.version>1.13.0</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version>
</dependency>

請(qǐng)?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務(wù)即可

my.cnf

[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30

ddl&dml.sql

create table test_cdc
(id   int          not nullprimary key,name varchar(100) null,age  int          null
);INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);

FlinkDSCDC.java

package com.daniel.util;import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Author Daniel* @Date: 2023/7/25 10:03* @Description DataStream API CDC**/
public class FlinkDSCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("localhost").port(3306).username("root").password("123456").databaseList("flink")// 這里一定要是db.table的形式.tableList("flink.test_cdc").deserializer(new StringDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);dataStreamSource.print();env.execute("FlinkDSCDC");}
}
UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;

打印出的日志

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以得出的結(jié)論:

1、日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。
2、每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(hào)(version)、連接器名稱(connector)、時(shí)間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來(lái)源和處理過(guò)程。
3、日志中的 sourceOffset 包含了一些關(guān)鍵信息,如事務(wù)ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準(zhǔn)確順序和一致性。

7.2Table/SQL API的用法

FlinkSQLCDC.java

package com.daniel.util;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @Author Daniel* @Date: 2023/7/25 15:25* @Description**/
public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE test_cdc (" +" id int primary key," +" name STRING," +" age int" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'latest-offset'," +" 'hostname' = 'localhost'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'flink'," +" 'table-name' = 'test_cdc'" +")");Table table = tableEnv.sqlQuery("select * from test_cdc");DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);dataStreamSource.print();env.execute("FlinkSQLCDC");}
}
UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);

打印出的日志

(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])

http://aloenet.com.cn/news/27861.html

相關(guān)文章:

  • 中國(guó)建設(shè)銀行行網(wǎng)站谷歌優(yōu)化的最佳方案
  • 做漁具最大的外貿(mào)網(wǎng)站營(yíng)銷方案案例范文
  • 做教育網(wǎng)站多少錢淘寶推廣引流方法有哪些
  • 網(wǎng)站開發(fā)技術(shù)教程百度搜索指數(shù)是怎么計(jì)算的
  • 企業(yè)網(wǎng)站制作機(jī)構(gòu)排名怎樣進(jìn)行關(guān)鍵詞推廣
  • 微信公眾平臺(tái)客服谷歌seo培訓(xùn)
  • 做商城網(wǎng)站需要什么資質(zhì)海南網(wǎng)站制作
  • 做網(wǎng)站和做推廣有什么區(qū)別站內(nèi)推廣
  • 新鄉(xiāng)網(wǎng)站建設(shè)哪家好seo包年服務(wù)
  • 網(wǎng)站備案要求北京seo薪資
  • 西安是哪個(gè)省屬于哪個(gè)省專業(yè)網(wǎng)站推廣優(yōu)化
  • 做網(wǎng)站的屬于什么重慶seo排名優(yōu)化
  • 個(gè)人可以做電影網(wǎng)站嗎seo診斷工具有哪些
  • 網(wǎng)站建設(shè)設(shè)計(jì) 飛沐中小企業(yè)網(wǎng)站制作
  • 牡丹江百度seo排名優(yōu)化公司推薦
  • 網(wǎng)站數(shù)據(jù)庫(kù)怎么做同步今日重要新聞
  • 輕松做網(wǎng)站江蘇seo平臺(tái)
  • 企業(yè)做網(wǎng)站營(yíng)銷的四大途徑東莞推廣公司
  • 八喜網(wǎng)站建設(shè)微平臺(tái)推廣
  • 南京網(wǎng)站制作搭建app推廣怎么做
  • 網(wǎng)架加工廠家德州網(wǎng)站建設(shè)優(yōu)化
  • wordpress做論壇網(wǎng)站app推廣方法
  • ninaszjs wordpress電腦系統(tǒng)優(yōu)化軟件排行榜
  • 網(wǎng)站開發(fā)個(gè)人總結(jié)市場(chǎng)調(diào)研與分析
  • 商丘專業(yè)做網(wǎng)站seo外鏈建設(shè)方法
  • 公司網(wǎng)站模板內(nèi)容seo值怎么提高
  • 網(wǎng)上那些彩票網(wǎng)站可以自己做嗎最近實(shí)時(shí)熱點(diǎn)新聞事件
  • 鄭州網(wǎng)站搭建的公司網(wǎng)絡(luò)營(yíng)銷方案的制定
  • 美國(guó)網(wǎng)站后綴搜索引擎優(yōu)化的方法
  • 中國(guó)建設(shè)網(wǎng)站企業(yè)網(wǎng)上銀行業(yè)務(wù)功能南京百度seo代理