西安 網(wǎng)站搭建深圳seo網(wǎng)絡(luò)優(yōu)化公司
文章目錄
- Spark與Iceberg集成之常用存儲過程
- 調(diào)用語法
- 調(diào)用樣例
- 表快照管理
- 快照回滾
- 根據(jù)snapshotid進(jìn)行回滾
- 根據(jù)timestamp進(jìn)行回滾
- 設(shè)置表當(dāng)前生效的快照
- 表元數(shù)據(jù)管理
- 設(shè)置快照過期時(shí)間
- 清除孤島文件
- 重寫數(shù)據(jù)文件
- 運(yùn)用參數(shù)示例
- options
- General Options
- Options for sort strategy
- Options for sort strategy with zorder sort_order
- 重寫清單文件
- 重寫位置刪除文件
- Options
Spark與Iceberg集成之常用存儲過程
spark與iceberg集成后,可以通過內(nèi)置的存儲過程來進(jìn)行表的管理。使用CALL
來調(diào)用存儲過程。所有的存儲過程在system
的命名空間中。
官網(wǎng)地址
由于表遷移功能的風(fēng)險(xiǎn)較大,所以不去進(jìn)行表的遷移,使用重建Iceberg表,重寫數(shù)據(jù)的方式進(jìn)行切換。
調(diào)用語法
catalog_name代表catalog的名稱,procedure_name代表存儲過程的名稱,參數(shù)可以通過指定參數(shù)名的方式入?yún)?#xff0c;也可以使用位移的方式入?yún)ⅰ?/p>
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);
調(diào)用樣例
SparkSession spark = SparkSession.builder().master("local").appName("Iceberg spark example").config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog").config("spark.sql.catalog.local.type", "hadoop") //指定catalog 類型.config("spark.sql.catalog.local.warehouse", "iceberg_warehouse").getOrCreate();spark.sql("CALL local.system.rollback_to_snapshot('iceberg_db.table2', 3285133177610707025) ");
表快照管理
快照回滾
根據(jù)snapshotid進(jìn)行回滾
存儲過程名 rollback_to_snapshot
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
snapshot_id | ?? | long | metadata 文件中snapshots 中的快照子項(xiàng)snapshot-id 的值 |
根據(jù)timestamp進(jìn)行回滾
存儲過程名 rollback_to_timestamp
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
timestamp | ?? | timestamp | metadata 文件中snapshots 中的快照子項(xiàng)timestamp-ms 的值 |
設(shè)置表當(dāng)前生效的快照
存儲過程名 set_current_snapshot
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
snapshot_id | long | 取metadata 文件中snapshots 中的快照子項(xiàng)snapshot-id 的值 | |
ref | string | 快照引用,取metadata 文件中refs 中的分支名稱或者標(biāo)記名稱 |
?? snapshot_id
與ref
只能取其一。
表元數(shù)據(jù)管理
設(shè)置快照過期時(shí)間
Iceberg中的每次write
/update
/delete
/upsert
/compaction
都會生成一個(gè)新快照,同時(shí)保留舊數(shù)據(jù)和元數(shù)據(jù),以便進(jìn)行快照隔離和時(shí)間旅行。expire_snapshots
過程可用于刪除不再需要的舊快照及其文件。
這個(gè)過程將刪除舊快照和那些舊快照唯一需要的數(shù)據(jù)文件。這意味著expire_snapshots
過程永遠(yuǎn)不會刪除未過期快照仍然需要的文件。
存儲過程名 expire_snapshots
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
older_than | ? | timestamp | 超過該時(shí)限的快照將被刪除 ,默認(rèn)是: 5天前的被刪除 |
retain_last | int | 忽略 older_than 作用, 而保留的快照數(shù)量 (defaults to 1) | |
max_concurrent_deletes | int | 用于刪除文件操作的線程池大小(默認(rèn)情況下不使用線程池) | |
stream_results | boolean | 為true 時(shí),刪除文件將按 RDD 分區(qū)發(fā)送給 Spark 驅(qū)動程序(默認(rèn)情況下,所有文件都將發(fā)送給 Spark 驅(qū)動程序)。建議將該選項(xiàng)設(shè)置為 true ,以防止 Spark 驅(qū)動程序因文件大小而發(fā)生 OOM。 | |
snapshot_ids | array of long | 要過期的快照 ID 數(shù)組 |
如果省略 older_than
和 retain_last
,則將使用表的expiration properties。仍被分支或標(biāo)記引用的快照不會被刪除。默認(rèn)情況下,分支和標(biāo)記永不過期,但可以使用表屬性 history.expire.max-ref-age-ms
更改其保留策略。main
分支永不過期。
?? 使用此存儲過程時(shí),必須增加stream_results
且值為true
。
清除孤島文件
用于刪除未在iceberg表的任何元數(shù)據(jù)文件中引用的文件,因此可視為 “孤島”。
存儲過程名 remove_orphan_files
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
older_than | ? | timestamp | 刪除在此時(shí)間戳之前創(chuàng)建的孤島文件 (Defaults to 3 days ago) |
location | string | 查找文件的目錄 (defaults to the table’s location) | |
dry_run | boolean | 預(yù)執(zhí)行,若值為true ,實(shí)際并未刪除文件 (defaults to false) | |
max_concurrent_deletes | int | 用于刪除文件操作的線程池大小(默認(rèn)情況下不使用線程池) | |
file_list_view | string | 查找文件的數(shù)據(jù)集(跳過目錄列表) | |
equal_schemes | map | 被視為相同的文件系統(tǒng)方案的映射。鍵是一個(gè)以逗號分隔的方案列表,值是一個(gè)方案 (defaults to map('s3a,s3n','s3') ). | |
equal_authorities | map | 被視為等同的文件系統(tǒng)權(quán)限映射。鍵是以逗號分隔的權(quán)限列表,值是權(quán)限。 | |
prefix_mismatch_mode | string | 位置前綴(方案/授權(quán))不匹配時(shí)的操作行為: ERROR- 拋出異常. (default) IGNORE - 啥也不干. DELETE - 刪除文件. |
重寫數(shù)據(jù)文件
Iceberg 在一個(gè)表格中跟蹤每個(gè)數(shù)據(jù)文件。數(shù)據(jù)文件越多,存儲在清單文件中的元數(shù)據(jù)也就越多,而數(shù)據(jù)文件過小則會導(dǎo)致不必要的元數(shù)據(jù)量和文件打開成本,從而降低查詢效率。
Iceberg 可以使用 Spark 的 rewriteDataFiles 操作并行壓縮數(shù)據(jù)文件。這將把小文件合并為大文件,以減少元數(shù)據(jù)開銷和運(yùn)行時(shí)文件打開成本。
存儲過程名 rewrite_data_files
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
strategy | string | 策略名稱 - 二進(jìn)制包或排序。默認(rèn)為 binpack 策略 | |
sort_order | string | 對于 Zorder,請?jiān)?zorder() 中使用逗號分隔的列列表。例如:zorder(c1,c2,c3)。否則,以逗號分隔的排序順序格式為(ColumnName SortDirection NullOrder)。其中,SortDirection 可以是 ASC 或 DESC。NullOrder 可以是 NULLS FIRST 或 NULLS LAST。默認(rèn)為表格的排序順序 | |
options | ? | map | 支持一些配置項(xiàng) |
where | ? | string | 用于篩選文件的字符串。請注意,所有可能包含匹配過濾器數(shù)據(jù)的文件都會被選中進(jìn)行重寫 |
運(yùn)用參數(shù)示例
spark.sql("CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))");
options
General Options
屬性名稱 | Default Value | 備注 |
---|---|---|
max-concurrent-file-group-rewrites | 5 | 同時(shí)重寫的最大文件組數(shù) |
partial-progress.enabled | false | 是否“分步提交”,允許在完成整個(gè)重寫之前提交文件組 |
partial-progress.max-commits | 10 | 如果啟用了部分進(jìn)度,允許此重寫產(chǎn)生的最大提交次數(shù) |
use-starting-sequence-number | true | 使用壓縮開始時(shí)快照的序列號,而不是新生成快照的序列號 |
rewrite-job-order | none | |
target-file-size-bytes | 536870912 (512 MB, default value of write.target-file-size-bytes from table properties) | 重寫輸出文件大小 |
min-file-size-bytes | 75% of target file size | 低于此閾值的文件將被考慮重寫,而不考慮任何其他標(biāo)準(zhǔn) |
max-file-size-bytes | 180% of target file size | 文件大小超過此閾值時(shí),將考慮重寫,而不考慮任何其他標(biāo)準(zhǔn) |
min-input-files | 5 | 任何文件組的文件數(shù)超過這個(gè)數(shù)量,都將被重寫,而不考慮其他標(biāo)準(zhǔn) |
rewrite-all | false | 強(qiáng)制重寫所有提供的文件,優(yōu)先于其他選項(xiàng) |
max-file-group-size-bytes | 107374182400 (100GB) | 單個(gè)文件組中應(yīng)重寫的最大數(shù)據(jù)量。整個(gè)重寫操作會根據(jù)分區(qū)和分區(qū)內(nèi)文件組的大小分成若干塊。這有助于分解超大分區(qū)的重寫,否則由于群集的資源限制,這些分區(qū)可能無法重寫。 |
delete-file-threshold | 2147483647 | 考慮重寫數(shù)據(jù)文件所需的最少刪除次數(shù) |
Options for sort strategy
屬性名稱 | Default Value | 備注 |
---|---|---|
compression-factor | 1.0 | shuffle 分區(qū)的數(shù)量以及 Spark 排序創(chuàng)建的輸出文件數(shù)量取決于文件重寫器使用的輸入數(shù)據(jù)文件的大小。由于壓縮,磁盤文件大小可能無法準(zhǔn)確代表輸出文件的大小。該參數(shù)允許用戶調(diào)整用于估算實(shí)際輸出數(shù)據(jù)大小的文件大小。系數(shù)大于 1.0 會生成比我們根據(jù)磁盤文件大小所預(yù)期的更多的文件。如果數(shù)值小于 1.0,根據(jù)磁盤上的文件大小,生成的文件將比我們預(yù)期的少 |
shuffle-partitions-per-file | 1 | 每個(gè)輸出文件要使用的分區(qū)數(shù)量。iceberg會使用自定義的合并操作,將這些已排序的分區(qū)拼接成一個(gè)已排序的文件。 |
Options for sort strategy with zorder sort_order
屬性名稱 | Default Value | 備注 |
---|---|---|
var-length-contribution | 8 | 從長度可變的輸入列(字符串、二進(jìn)制)中考慮的字節(jié)數(shù) |
max-output-size | 2147483647 | ZOrder 算法中的字節(jié)交錯(cuò)量 |
重寫清單文件
重寫表的清單,優(yōu)化掃描規(guī)劃。
清單中的數(shù)據(jù)文件按分區(qū)規(guī)范中的字段排序。該程序使用 Spark 作業(yè)并行運(yùn)行。
存儲過程名 rewrite_manifests
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
use_caching | ? | boolean | 在運(yùn)行過程中使用 Spark 緩存(默認(rèn)為 true) |
spec_id | ? | int | 要重寫的清單的規(guī)格 ID(默認(rèn)為當(dāng)前規(guī)格 ID) |
重寫位置刪除文件
Iceberg 可以重寫位置刪除文件,這樣做有兩個(gè)目的:
- 小型壓縮: 將小的位置刪除文件壓縮成大文件。這樣可以減少存儲在清單文件中的元數(shù)據(jù)大小,并減少打開小的刪除文件的開銷。
- 刪除懸而未決的刪除記錄: 過濾掉引用不再有效的數(shù)據(jù)文件的位置刪除記錄。重寫數(shù)據(jù)文件后,指向重寫數(shù)據(jù)文件的位置刪除記錄并不總是被標(biāo)記為刪除,而是會繼續(xù)被表的實(shí)時(shí)快照元數(shù)據(jù)跟蹤。這就是所謂的 “懸空刪除 ”問題。
存儲過程名 rewrite_position_delete_files
參數(shù)名稱 | 是否必填 | 參數(shù)類型 | 備注 |
---|---|---|---|
table | ?? | string | 表名 |
options | ? | map | 支持一些配置項(xiàng) |
在重寫過程中,懸掛刪除總是會被過濾掉。
Options
屬性名稱 | Default Value | 備注 |
---|---|---|
max-concurrent-file-group-rewrites | 5 | 同時(shí)重寫的最大文件組數(shù) |
partial-progress.enabled | false | 是否“分步提交”,允許在完成整個(gè)重寫之前提交文件組 |
partial-progress.max-commits | 10 | 如果啟用了部分進(jìn)度,允許此重寫產(chǎn)生的最大提交次數(shù) |
rewrite-job-order | none | |
target-file-size-bytes | 67108864 (64MB, default value of write.delete.target-file-size-bytes from table properties) | 重寫輸出文件大小 |
min-file-size-bytes | 75% of target file size | 低于此閾值的文件將被考慮重寫,而不考慮任何其他標(biāo)準(zhǔn) |
max-file-size-bytes | 180% of target file size | 文件大小超過此閾值時(shí),將考慮重寫,而不考慮任何其他標(biāo)準(zhǔn) |
min-input-files | 5 | 任何文件組的文件數(shù)超過這個(gè)數(shù)量,都將被重寫,而不考慮其他標(biāo)準(zhǔn) |
rewrite-all | false | 強(qiáng)制重寫所有提供的文件,優(yōu)先于其他選項(xiàng) |
max-file-group-size-bytes | 107374182400 (100GB) | 單個(gè)文件組中應(yīng)重寫的最大數(shù)據(jù)量。整個(gè)重寫操作會根據(jù)分區(qū)和分區(qū)內(nèi)文件組的大小分成若干塊。這有助于分解超大分區(qū)的重寫,否則由于群集的資源限制,這些分區(qū)可能無法重寫。 |