Spark 批式查询

最近更新时间:2026-04-16 11:28:52

我的收藏

概述

Setats 支持通过 Spark 进行批式数据查询。用户可以使用 Spark SQL 直接访问 Setats 表中的最新快照数据,用于离线分析、明细抽样、聚合统计和结果回查等场景。

前提条件

已创建 Setats 集群并完成 Warehouse 配置。
已获取 Setats Spark Connector Jar 包。
已确认 Spark 集群可访问 Warehouse 底层存储,以及 Hive Metastore(若使用 Hive Catalog)。
已将所需 Jar 包部署到 Spark 节点本地目录或 HDFS 可访问路径。
如果使用 EMR Spark 3.3 + Hive Catalog + COS 加速桶,建议提前准备:
setats-spark-bundle-<spark-version>_<scala-version>-<connector-version>.jar
Hive Metastore 可用地址
Warehouse 路径,例如 cosn://<bucket>/warehouse/

环境准备

登录 Spark 节点

以 EMR Spark 3.3 为例,建议先登录 Spark 所在节点,并确认 Connector Jar 已上传到本地或 HDFS。
如果 Jar 存放在本地目录,可直接通过 --jars 引用;如果存放在 HDFS,也可以在启动参数中引用对应路径。

启动 Spark SQL

以下示例使用 Hive Catalog:
sh /usr/local/service/spark/bin/spark-sql \\
--jars /usr/local/service/spark/setats-spark-bundle-3.3_2.12-<connector-version>.jar \\
--conf spark.sql.setats.force.read.from.service=true \\
--conf spark.sql.setats.force.read.service.data=true \\
--conf spark.sql.session.timeZone='UTC' \\
--conf spark.driver.cores=4 \\
--conf spark.driver.memory=4g \\
--conf spark.executor.memory=4g \\
--conf spark.executor.cores=4 \\
--conf spark.sql.extensions=com.tencent.oceanus.spark.extensions.SetatsSparkSessionExtensions \\
--conf spark.sql.catalog.setats=com.tencent.oceanus.spark.SparkCatalog \\
--conf spark.sql.catalog.setats.type=hive \\
--conf spark.sql.catalog.setats.warehouse=cosn://<bucket>/warehouse/ \\
--conf spark.sql.catalog.setats.uri=thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004
如果使用 Hadoop Catalog,可将 spark.sql.catalog.setats.type 改为 hadoop,并根据实际情况省略 uri。

Spark Catalog 参数说明

参数
说明
spark.sql.setats.force.read.from.service
开启元数据读 Setats 服务
spark.sql.setats.force.read.service.data
开启内存数据读 Setats 服务
spark.sql.session.timeZone
Spark SQL 会话时区。进行时间旅行查询时建议设置为 UTC,避免按时间点读取历史快照时出现时区偏移
spark.sql.extensions
Setats Spark Session 扩展类:com.tencent.oceanus.spark.extensions.SetatsSparkSessionExtensions
spark.sql.catalog.setats
Setats Spark Catalog 实现类:com.tencent.oceanus.spark.SparkCatalog
spark.sql.catalog.setats.type
Catalog 类型,如 hive 或 hadoop
spark.sql.catalog.setats.warehouse
Setats Warehouse 地址
spark.sql.catalog.setats.uri
Hive Metastore 地址,仅 Hive Catalog 需要

查询示例

基本查询:
SELECT *
FROM `setats`.`testdb`.`demo_setats_table1`;
指定分区查询:
SELECT *
FROM `setats`.`testdb`.`demo_setats_table1`
WHERE dt = '20260319';
聚合查询:
SELECT
dt,
COUNT(*) AS cnt
FROM `setats`.`testdb`.`demo_setats_table1`
GROUP BY dt;
查询同步结果表示例:
SELECT *
FROM `setats`.`testdb`.`mysql_user_behavior_sink`;

系统表

Spark 批式查询支持访问 Setats 系统表,用于查看表的快照、Manifest、数据文件和索引文件等元数据信息。当前支持的系统表如下:
系统表
说明
snapshots
查询表的快照历史
manifests
查询 Manifest 文件信息
manifests_detail
查询 Manifest 明细信息
files
查询当前快照对应的数据文件信息
files_with_dv
查询带删除向量的数据文件信息
index_files
查询索引文件信息
bucket_manifests
查询 Bucket 与 Manifest 的关联信息
例如,可通过系统表查看目标表的快照历史:
SELECT *
FROM `setats`.`testdb`.`demo_setats_table1`.`snapshots`;

分区信息查询

当前在 Spark SQL 中查询分区信息时,需要使用 SHOW PARTITIONS 语句:
SHOW PARTITIONS `setats`.`testdb`.`demo_setats_table1`;

时间旅行

Setats 支持在 Spark 中基于历史快照执行时间旅行查询,适用于历史数据回查、结果核对和问题定位等场景。
如果需要按快照版本查询历史数据,建议先通过 snapshots 系统表确认目标快照,再执行时间旅行查询。
例如:
SELECT *
FROM `setats`.`testdb`.`demo_setats_table1`
VERSION AS OF <snapshot_id>;
如果需要按时间点查询历史数据,可使用时间点方式访问历史快照:
SELECT *
FROM `setats`.`testdb`.`demo_setats_table1`
TIMESTAMP AS OF '<yyyy-MM-dd HH:mm:ss>';

表名格式

Spark 中访问 Setats 表的格式为:
`<catalog_name>`.`<database>`.`<table_name>`
例如:setats.testdb.demo_setats_table1