说明
支持内核:SparkSQL。
适用表类型:外部 Iceberg 表、原生 Iceberg 表。
基本句法
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);
Snapshot 管理
rollback_to_snapshot
回滚快照到指定版本。
输入参数:表名和版本号。
CALL `DataLakeCatalog`.`system`.rollback_to_snapshot('validation.dempts', 1);
rollback_to_timestamp
回滚快照到指定时间戳。
输入参数:表名和时间戳。
CALL `DataLakeCatalog`.`system`.rollback_to_timestamp('validation.dempts', TIMESTAMP '2022-08-11 19:49:43.224');
set_current_snapshot
设置当前快照版本。
输入参数:表名和版本号。
CALL `DataLakeCatalog`.`system`.set_current_snapshot('validation.dempts', 1);
cherrypick_snapshot
从指定快照版本 cherrypick 到当前快照。
CALL `DataLakeCatalog`.`system`.cherrypick_snapshot('validation.dempts', 1);CALL `DataLakeCatalog`.`system`.cherrypick_snapshot(snapshot_id => 1, table => 'my_table' )
Metadata 管理
expire_snapshots
清理过期快照,减少小文件数。
CALL `Catalog`.`system`.expire_snapshots(table_name, [older_than], [retain_last], [max_concurrent_deletes], [stream_results]);
示例:
CALL `DataLakeCatalog`.`system`.expire_snapshots('validation.dempts', TIMESTAMP '2021-06-30 00:00:00.000', 100);
remove_orphan_files
移除不再被引用元数据文件。
CALL `Catalog`.`system`.remove_orphan_files(table_name, [older_than], [location], [dry_run], [max_concurrent_deletes]);
示例:
CALL `DataLakeCatalog`.`system`.remove_orphan_files(`table`=>'validation.dempts', dry_run=>TRUE);CALL `DataLakeCatalog`.`system`.remove_orphan_files(`table`=>'validation.dempts', `location`=>'cosn://channingdata-1305424723/example2/');CALL `DataLakeCatalog`.`system`.remove_orphan_files('validation.dempts', TIMESTAMP '2022-07-10 17:25:19.000');
remove_orphan_files
移除不再被引用元数据文件。
CALL `Catalog`.`system`.remove_orphan_files(table_name, [older_than], [location], [dry_run], [max_concurrent_deletes]);
示例:
CALL `DataLakeCatalog`.`system`.remove_orphan_files(`table`=>'validation.dempts', dry_run=>TRUE);CALL `DataLakeCatalog`.`system`.remove_orphan_files(`table`=>'validation.dempts', `location`=>'cosn://channingdata-1305424723/example2/');CALL `DataLakeCatalog`.`system`.remove_orphan_files('validation.dempts', TIMESTAMP '2022-07-10 17:25:19.000');
rewrite_data_files
数据文件合并重写,即小数据文件合并。
CALL `Catalog`.`system`.rewrite_data_files(table_name, [strategy], [sort_order], [options], [where]);
示例:
CALL `DataLakeCatalog`.`system`.rewrite_data_files('validation.dempts');CALL `DataLakeCatalog`.`system`.rewrite_data_files(`table`=>'validation.dempts', `strategy`=>'sort', `sort_order`=>'id DESC NULLS LAST,data ASC NULLS FIRST');CALL `DataLakeCatalog`.`system`.rewrite_data_files(`table`=>'validation.dempts', `options`=>map('min-input-files','2'));CALL `DataLakeCatalog`.`system`.rewrite_data_files(`table`=>'validation.dempts', `where`=>'id = 3 and data = "foo"');
rewrite_manifests
manifests 文件合并重写。
CALL `Catalog`.`system`.rewrite_manifests(table_name, [using_caching]);
示例:
CALL `DataLakeCatalog`.`system`.rewrite_manifests('validation.dempts');CALL `DataLakeCatalog`.`system`.rewrite_manifests('validation.dempts', FALSE);
ancestors_of
获取快照的血缘信息。
CALL `Catalog`.`system`.ancestors_of(table_name, [snapshot_id]);
示例:
CALL `DataLakeCatalog`.`system`.ancestors_of('validation.dempts');CALL `DataLakeCatalog`.`system`.ancestors_of('validation.dempts', 1);
数据表迁移管理
注意
原表必须为 Hive 表或 Spark 表。
snapshot
基于原始表创建轻量级的临时表,临时表直接复用原始表快照。
CALL `Catalog`.`system`.snapshot(source_table, table, [location], [properties]);
示例:
CALL `DataLakeCatalog`.`system`.snapshot('validation.table_01', 'validation.snap');CALL `DataLakeCatalog`.`system`.snapshot('validation.table_01', 'validation.snap2', 'cosn://channingdata-1305424723/example3/');
migrate
更新替换表属性。
CALL `Catalog`.`system`.migrate(table, [properties]);
示例:
CALL `DataLakeCatalog`.`system`.migrate('validation.table_01');CALL `DataLakeCatalog`.`system`.migrate('validation.table_01', map('data', 'name'));
add_files
直接从 hive 中加载数据文件,可指定数据文件到指定分区。
CALL `Catalog`.`system`.add_files(table, source_table, [partition_filter]);
示例:
CALL `DataLakeCatalog`.`system`.add_files(`table`=>'validation.table_02', `source_table`=>'validation.table_01');CALL `DataLakeCatalog`.`system`.add_files(`table`=>'validation.table_02', `source_table`=>'validation.table_01', `partition_filter`=>map('part_col', 'A'));