前端優(yōu)化網(wǎng)站天津百度搜索網(wǎng)站排名
在快速發(fā)展的數(shù)據(jù)驅(qū)動時代,實時數(shù)據(jù)處理已經(jīng)成為企業(yè)決策和運(yùn)營的關(guān)鍵因素。特別是在處理來自各種數(shù)據(jù)源的信息時,如何確保數(shù)據(jù)的及時、準(zhǔn)確和高效同步變得尤為重要。本文著重介紹了如何利用 SqlServer CDC 源連接器在 SeaTunnel 框架下實現(xiàn) SQL Server 到其他數(shù)據(jù)系統(tǒng)的實時數(shù)據(jù)同步,這對于希望提升數(shù)據(jù)處理能力和實時數(shù)據(jù)分析的企業(yè)來說,具有重要的實踐意義。
SQL Server CDC
SqlServer CDC 源連接器
支持 SQL Server 版本
- 服務(wù)器:2019(或更高版本,僅供參考)
支持引擎
SeaTunnel Zeta
Flink
主要特性
- 批處理
- 流處理
- 精確一次
- 列投影
- 并行處理
- 支持用戶自定義分片
描述
SqlServer CDC 連接器允許從 SqlServer 數(shù)據(jù)庫讀取快照數(shù)據(jù)和增量數(shù)據(jù)。本文檔描述了如何設(shè)置 SqlServer CDC 連接器以在 SqlServer 數(shù)據(jù)庫上運(yùn)行 SQL 查詢。
支持的數(shù)據(jù)源信息
數(shù)據(jù)源 | 支持的版本 | 驅(qū)動 | URL | Maven |
---|---|---|---|---|
SqlServer |
| com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433;databaseName=column_type_test | 下載 |
安裝 Jdbc 驅(qū)動
請下載并將 SqlServer 驅(qū)動放在 ${SEATUNNEL_HOME}/lib/
目錄下。例如:cp mssql-jdbc-xxx.jar ${SEATUNNEL_HOME}/lib/
數(shù)據(jù)類型映射
SQL Server 數(shù)據(jù)類型 | SeaTunnel 數(shù)據(jù)類型 |
---|---|
CHAR VARCHAR NCHAR NVARCHAR STRUCT CLOB LONGVARCHAR LONGNVARCHAR | STRING |
BLOB | BYTES |
INTEGER | INT |
SMALLINT TINYINT | SMALLINT |
BIGINT | BIGINT |
FLOAT REAL | FLOAT |
DOUBLE | DOUBLE |
NUMERIC DECIMAL(column.length(), column.scale().orElse(0)) | DECIMAL(column.length(), column.scale().orElse(0)) |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
TIME | TIME |
BOOLEAN BIT | BOOLEAN |
源選項
名稱 | 類型 | 必需 | 默認(rèn)值 | 描述 |
---|---|---|---|---|
username | 字符串 | 是 | - | 連接數(shù)據(jù)庫服務(wù)器時使用的用戶名。 |
password | 字符串 | 是 | - | 連接數(shù)據(jù)庫服務(wù)器時使用的密碼。 |
database-names | 列表 | 是 | - | 需要監(jiān)控的數(shù)據(jù)庫名。 |
table-names | 列表 | 是 | - | 表名為模式名和表名的組合(databaseName.schemaName.tableName)。 |
base-url | 字符串 | 是 | - | 必須包含數(shù)據(jù)庫的URL,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。 |
startup.mode | 枚舉 | 否 | INITIAL | SqlServer CDC 消費(fèi)者的可選啟動模式,有效枚舉為 "initial"、"earliest"、"latest" 和 "specific"。 |
startup.timestamp | 長整型 | 否 | - | 從指定的紀(jì)元時間戳(以毫秒為單位)開始。 注意,當(dāng)使用 "startup.mode" 選項為 'timestamp' 時,此選項是必需的。 |
startup.specific-offset.file | 字符串 | 否 | - | 從指定的 binlog 文件名開始。 注意,當(dāng) "startup.mode" 選項使用 'specific' 時,此選項是必需的。 |
startup.specific-offset.pos | 長整型 | 否 | - | 從指定的 binlog 文件位置開始。 注意,當(dāng) "startup.mode" 選項使用 'specific' 時,此選項是必需的。 |
stop.mode | 枚舉 | 否 | NEVER | SqlServer CDC 消費(fèi)者的可選停止模式,有效枚舉為 "never"。 |
stop.timestamp | 長整型 | 否 | - | 從指定的紀(jì)元時間戳(以毫秒為單位)停止。 注意,當(dāng) "stop.mode" 選項使用 'timestamp' 時,此選項是必需的。 |
stop.specific-offset.file | 字符串 | 否 | - | 從指定的 binlog 文件名停止。 注意,當(dāng) "stop.mode" 選項使用 'specific' 時,此選項是必需的。 |
stop.specific-offset.pos | 長整型 | 否 | - | 從指定的 binlog 文件位置停止。 注意,當(dāng) "stop.mode" 選項使用 'specific' 時,此選項是必需的。 |
incremental.parallelism | 整型 | 否 | 1 | 增量階段中并行讀取器的數(shù)量。 |
snapshot.split.size | 整型 | 否 | 8096 | 表快照的分割大小(行數(shù)),快照期間的表會被分割成多個分片進(jìn)行讀取。 |
snapshot.fetch.size | 整型 | 否 | 1024 | 讀取表快照時每次輪詢的最大提取量。 |
server-time-zone | 字符串 | 否 | UTC | 數(shù)據(jù)庫服務(wù)器中的會話時區(qū)。 |
connect.timeout | 時長 | 否 | 30s | 連接器嘗試連接到數(shù)據(jù)庫服務(wù)器后等待超時的最大時間。 |
connect.max-retries | 整型 | 否 | 3 | 連接器嘗試建立數(shù)據(jù)庫服務(wù)器連接的最大重試次數(shù)。 |
connection.pool.size | 整型 | 否 | 20 | 連接池大小。 |
chunk-key.even-distribution.factor.upper-bound | 雙精度浮點(diǎn)型 | 否 | 100 | 分塊鍵分布因子的上界。此因子用于判斷表數(shù)據(jù)是否均勻分布。如果計算出的分布因子小于或等于此上界值(即 (MAX(id) - MIN(id) + 1) / 行數(shù)),則表分塊將被優(yōu)化為均勻分布。否則,如果分布因子更大,則表將被認(rèn)為是不均勻分布的,并且如果估計的分片數(shù)超過 sample-sharding.threshold 指定的值,將使用基于抽樣的分片策略。默認(rèn)值為 100.0。 |
chunk-key.even-distribution.factor.lower-bound | 雙精度浮點(diǎn)型 | 否 | 0.05 | 分塊鍵分布因子的下界。此因子用于判斷表數(shù)據(jù)是否均勻分布。如果計算出的分布因子大于或等于此下界值(即 (MAX(id) - MIN(id) + 1) / 行數(shù)),則表分塊將被優(yōu)化為均勻分布。否則,如果分布因子更小,則表將被認(rèn)為是不均勻分布的,并且如果估計的分片數(shù)超過 sample-sharding.threshold 指定的值,將使用基于抽樣的分片策略。默認(rèn)值為 0.05。 |
sample-sharding.threshold | 整型 | 否 | 1000 | 觸發(fā)抽樣分片策略的估計分片數(shù)閾值。當(dāng)分布因子超出 chunk-key.even-distribution.factor.upper-bound 和 chunk-key.even-distribution.factor.lower-bound 指定的范圍,并且估計的分片數(shù)(計算為近似行數(shù) / 分塊大小)超過此閾值時,將使用抽樣分片策略。這可以幫助更有效地處理大型數(shù)據(jù)集。默認(rèn)值為1000分片。 |
inverse-sampling.rate | 整型 | 否 | 1000 | 抽樣分片策略中使用的抽樣率的倒數(shù)。例如,如果這個值設(shè)置為1000,意味著抽樣過程中應(yīng)用了1/1000的抽樣率。這個選項提供了在控制抽樣粒度的靈活性,從而影響最終的分片數(shù)量。特別是在處理非常大的數(shù)據(jù)集時,更低的抽樣率是首選。默認(rèn)值為1000。 |
exactly_once | 布爾型 | 否 | true | 啟用精確一次語義。 |
debezium.* | 配置 | 否 | - | 將Debezium的屬性傳遞給用于從SqlServer服務(wù)器捕獲數(shù)據(jù)變化的Debezium嵌入式引擎。 查看Debezium的SqlServer連接器屬性獲取更多信息 |
format | 枚舉 | 否 | DEFAULT | SqlServer CDC 的可選輸出格式,有效枚舉為 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。 |
common-options | 否 | - | 源插件的通用參數(shù),請參考源通用選項獲取詳細(xì)信息。 |
任務(wù)示例
初始讀取簡單示例
這是一個流模式CDC初始化讀取的示例,成功讀取表數(shù)據(jù)后將進(jìn)行增量讀取。以下SQL DDL僅供參考。
env {# 在此處設(shè)置引擎配置execution.parallelism = 1job.mode = "STREAMING"execution.checkpoint.interval = 5000
}source {# 僅用于測試和演示功能的示例源插件SqlServer-CDC {result_table_name = "customers"username = "sa"password = "Y.sa123456"startup.mode="initial"database-names = ["column_type_test"]table-names = ["column_type_test.dbo.full_types"]base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"}
}transform {
}sink {console {source_table_name = "customers"}
增量讀取簡單示例
這是一個增量閱讀示例,用于閱讀變更數(shù)據(jù)并打印。
env {# 在此處設(shè)置引擎配置execution.parallelism = 1job.mode = "STREAMING"execution.checkpoint.interval = 5000
}source {# 僅用于測試和演示功能的示例源插件SqlServer-CDC {# 設(shè)置精確一次讀取exactly_once=true result_table_name = "customers"username = "sa"password = "Y.sa123456"startup.mode="latest"database-names = ["column_type_test"]table-names = ["column_type_test.dbo.full_types"]base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"}
}transform {
}sink {console {source_table_name = "customers"}
}
隨著數(shù)據(jù)處理需求的不斷增長和實時數(shù)據(jù)同步的重要性日益凸顯,SqlServer CDC 源連接器在 SeaTunnel 生態(tài)系統(tǒng)中扮演著至關(guān)重要的角色。
通過本文的深入解析,我們希望您能夠更好地理解并利用這一強(qiáng)大工具,從而實現(xiàn)數(shù)據(jù)流的高效、穩(wěn)定和精準(zhǔn)同步。
無論您是數(shù)據(jù)工程師、系統(tǒng)架構(gòu)師還是業(yè)務(wù)分析師,掌握如何在 SeaTunnel 中部署和優(yōu)化 SQL Server CDC 連接器,都將為您的數(shù)據(jù)處理能力帶來顯著提升。
本文由 白鯨開源科技 提供發(fā)布支持!