新鄉(xiāng)網(wǎng)站推廣公司微信運(yùn)營(yíng)
1.CDC概述
CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術(shù)。它允許實(shí)時(shí)地監(jiān)視數(shù)據(jù)庫(kù)或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動(dòng),并將這些變動(dòng)抽取出來(lái),以便進(jìn)行進(jìn)一步的處理和分析。
傳統(tǒng)上,數(shù)據(jù)源的變化通常通過(guò)周期性地輪詢整個(gè)數(shù)據(jù)集進(jìn)行檢查來(lái)實(shí)現(xiàn)。但是,這種輪詢的方式效率低下且不能實(shí)時(shí)反應(yīng)變化。而 CDC 技術(shù)則通過(guò)在數(shù)據(jù)源上設(shè)置一種機(jī)制,使得變化的數(shù)據(jù)可以被實(shí)時(shí)捕獲并傳遞給下游處理系統(tǒng),從而實(shí)現(xiàn)了實(shí)時(shí)的數(shù)據(jù)變動(dòng)監(jiān)控。
Flink 作為一個(gè)強(qiáng)大的流式計(jì)算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫(kù)、消息隊(duì)列等),捕獲其中的數(shù)據(jù)變化,并進(jìn)行靈活的實(shí)時(shí)處理和分析。
通過(guò)使用 Flink CDC,我們可以輕松地構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,對(duì)數(shù)據(jù)變動(dòng)進(jìn)行實(shí)時(shí)響應(yīng)和處理,為實(shí)時(shí)分析、實(shí)時(shí)報(bào)表和實(shí)時(shí)決策等場(chǎng)景提供強(qiáng)大的支持。
?
2.CDC 的實(shí)現(xiàn)原理
通常來(lái)講,CDC 分為主動(dòng)查詢和事件接收兩種技術(shù)實(shí)現(xiàn)模式。對(duì)于主動(dòng)查詢而言,用戶通常會(huì)在數(shù)據(jù)源表的某個(gè)字段中,保存上次更新的時(shí)間戳或版本號(hào)等信息,然后下游通過(guò)不斷的查詢和與上次的記錄做對(duì)比,來(lái)確定數(shù)據(jù)是否有變動(dòng),是否需要同步。這種方式優(yōu)點(diǎn)是不涉及數(shù)據(jù)庫(kù)底層特性,實(shí)現(xiàn)比較通用;缺點(diǎn)是要對(duì)業(yè)務(wù)表做改造,且實(shí)時(shí)性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對(duì)數(shù)據(jù)庫(kù)的壓力較大。事件接收模式可以通過(guò)觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來(lái)實(shí)現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動(dòng)時(shí),會(huì)通過(guò)附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來(lái)。下游可以通過(guò)數(shù)據(jù)庫(kù)底層的協(xié)議,訂閱并消費(fèi)這些事件,然后對(duì)數(shù)據(jù)庫(kù)變動(dòng)記錄做重放,從而實(shí)現(xiàn)同步。這種方式的優(yōu)點(diǎn)是實(shí)時(shí)性高,可以精確捕捉上游的各種變動(dòng);缺點(diǎn)是部署數(shù)據(jù)庫(kù)的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運(yùn)維成本,對(duì)一些冷門的數(shù)據(jù)庫(kù)支持不夠。綜合來(lái)看,事件接收模式整體在實(shí)時(shí)性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見(jiàn)的數(shù)據(jù)庫(kù)實(shí)現(xiàn),建議使用Debezium來(lái)實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來(lái)自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。
?
3.為什么選 Flink
從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過(guò) Kafka Streams 直接實(shí)現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因?yàn)?Flink 相對(duì) Kafka Streams 而言,有如下優(yōu)勢(shì):
強(qiáng)大的流處理引擎: Flink 是一個(gè)強(qiáng)大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語(yǔ)義等特性。它通過(guò)基于事件時(shí)間的處理模型,支持準(zhǔn)確和有序的數(shù)據(jù)處理,適用于實(shí)時(shí)數(shù)據(jù)處理和分析場(chǎng)景。這使得 Flink 成為實(shí)現(xiàn) CDC 的理想選擇。
內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進(jìn)行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實(shí)現(xiàn) CDC 變得更加簡(jiǎn)單和高效。
多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進(jìn)行集成,如關(guān)系型數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL)、消息隊(duì)列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無(wú)論你的數(shù)據(jù)存儲(chǔ)在哪里,Flink 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進(jìn)行進(jìn)一步的實(shí)時(shí)處理和分析。
靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強(qiáng)大的數(shù)據(jù)處理能力,可以通過(guò)編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來(lái)對(duì) CDC 數(shù)據(jù)進(jìn)行各種實(shí)時(shí)計(jì)算和分析。同時(shí),Flink 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語(yǔ)句或 Table API 進(jìn)行簡(jiǎn)單查詢和分析的方式。
完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,Flink 還與其他流行的開源項(xiàng)目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。
4.支持的連接器
?5.支持的 Flink 版本
?
6.Flink CDC特性
支持讀取數(shù)據(jù)庫(kù)快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進(jìn)行Exactly-once處理
DataStream API 的 CDC 連接器,用戶可以在單個(gè)作業(yè)中使用多個(gè)數(shù)據(jù)庫(kù)和表的更改,而無(wú)需部署 Debezium 和 Kafka
Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來(lái)監(jiān)視單個(gè)表上的更改
下表顯示了連接器的當(dāng)前特性:
?
7.用法實(shí)例
7.1DataStream API 的用法(推薦)
請(qǐng)嚴(yán)格按照上面的《5.支持的 Flink 版本》搭配來(lái)使用Flink CDC
<properties><flink.version>1.13.0</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version>
</dependency>
請(qǐng)?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務(wù)即可
my.cnf
[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30
ddl&dml.sql
create table test_cdc
(id int not nullprimary key,name varchar(100) null,age int null
);INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);
FlinkDSCDC.java
package com.daniel.util;import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Author Daniel* @Date: 2023/7/25 10:03* @Description DataStream API CDC**/
public class FlinkDSCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("localhost").port(3306).username("root").password("123456").databaseList("flink")// 這里一定要是db.table的形式.tableList("flink.test_cdc").deserializer(new StringDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);dataStreamSource.print();env.execute("FlinkDSCDC");}
}
UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;
打印出的日志
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
可以得出的結(jié)論:
1、日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個(gè)更新操作,對(duì)應(yīng)的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。
2、每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(hào)(version)、連接器名稱(connector)、時(shí)間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來(lái)源和處理過(guò)程。
3、日志中的 sourceOffset 包含了一些關(guān)鍵信息,如事務(wù)ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準(zhǔn)確順序和一致性。
7.2Table/SQL API的用法
FlinkSQLCDC.java
package com.daniel.util;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @Author Daniel* @Date: 2023/7/25 15:25* @Description**/
public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE test_cdc (" +" id int primary key," +" name STRING," +" age int" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'latest-offset'," +" 'hostname' = 'localhost'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'flink'," +" 'table-name' = 'test_cdc'" +")");Table table = tableEnv.sqlQuery("select * from test_cdc");DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);dataStreamSource.print();env.execute("FlinkSQLCDC");}
}
UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);
打印出的日志
(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])