DLC PySpark

最近更新时间:2024-05-17 11:03:52

我的收藏
注意:
需要绑定 DLC 引擎。目前 DLC PySpark 支持 Spark 作业引擎。引擎内核详情可参见 DLC 引擎内核版本

功能说明

在 WeData 中创建一个 DLC PySpark 的任务,提交到 WeData 调度平台以及 DLC 引擎执行。

任务参数说明

在 DLC PySpark 的任务属性中可以添加 DLC PySpark 任务数据访问策略、入口参数、依赖资源、Spark 任务的 conf 参数以及任务镜像。
参数名称
参数说明
数据访问策略
必填,任务执行过程中访问cos数据的安全策略,详情可参考 DLC 配置数据访问策略
入口参数
非必填,程序的入口参数,支持填写多个。多个参数使用“空格”分割。
依赖资源
非必填,支持选择 --py-files、--files、--archives,每一种资源可以输入多个 cos 路径,多个路径用逗号 (,) 分割。
conf 参数
非必填,spark. 开头的参数,按照 k=v 格式填写,多个参数换行填写。示例:spark.network.timeout=120s。
任务镜像
任务执行的镜像,如果任务要使用特定的镜像可以选择 DLC 内置镜像和自定义镜像。
详情可参考 DLC Spark 镜像
资源配置
使用集群资源配置:使用集群默认的资源配置参数。
自定义:自定义任务的资源使用参数,包括 executor 大小、driver 大小、executor 个数。

示例代码

from os.path import abspath

from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \\
.builder \\
.appName("Operate DB Example") \\
.getOrCreate()
# 1.建数据库
spark.sql("CREATE DATABASE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py` COMMENT 'demo test' ")
# 2.建内表
spark.sql("CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`test`(`id` int,`name` string,`age` int) ")
# 3.写内数据
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 4.查内数据
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`test` ").show()
# 5.建外表
spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`ext_test`(`id` int, `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'cosn://cos-bucket-name/ext_test' ")
# 6.写外数据
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 7.查外数据
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` ").show()
spark.stop()