PySpark,作为 Apache Spark 的 Python API,使得处理和分析大数据变得更加高效且易于访问。本章详细讲解了PySpark 的基本概念和架构以及据的输入与输出操作。
①定义
Apache Spark 是一个用于大规模数据处理的统一分析引擎。简单来说,Spark 是一款分布式计算框架,能够调度成百上千的服务器集群,以处理 TB、PB 乃至 EB 级别的海量数据。
作为全球顶级的分布式计算框架,Spark 支持多种编程语言进行开发,其中 Python 语言是 Spark 特别支持的重点方向。
Spark 对 Python 的支持主要体现在第三方库 PySpark 上。PySpark 是由Spark 官方开发的一款 Python 库,允许开发者使用 Python 代码完成 Spark 任务。
PySpark 不仅可以作为独立的 Python 库使用,还能将程序提交到 Spark 集群进行大规模的数据处理。
Python 的应用场景和就业方向相当广泛,其中大数据开发和人工智能是最为突出的方向。
②安装PySpark库
电脑输入Win+R打开运行窗口→在运行窗口输入“cmd”→点击“确定”→输入pip install pyspark
③编程模型
PySpark 的编程流程主要分为以下三个步骤:
准备数据到RDD → RDD迭代计算 → RDD导出为列表、元组、字典、文本文件或数据库等。
④构建PySpark执行环境入口对象
SparkContext
是PySpark的入口点,负责与 Spark 集群的连接,并提供了创建 RDD(弹性分布式数据集)的接口。
要使用 PySpark 库完成数据处理,首先需要构建一个执行环境的入口对象,该对象是 SparkContext 类的实例。创建 SparkContext 对象后,便可开始进行数据处理和分析。
# 导包
# SparkConf:用于配置Spark应用的参数
# SparkContext:用于连接到Spark集群的入口点,负责协调整个Spark应用的运行
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象,用于设置 Spark 程序的配置
# local[*]表示在本地运行Spark
# [*]表示使用系统中的所有可用核心。这适合于开发和测试。
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
SparkConf 类的常用方法:
方法 | 描述 |
---|---|
| 设置 Spark 的运行模式 |
| 设置 Spark 应用程序的名称,在 Spark UI 中显示 |
| 设置任意的配置参数,通过键-值对的方式设置配置项 |
| 批量设置多个配置项,接收包含键-值对的列表或元组 |
| 设置 executor 的环境变量 |
| 获取指定键的配置值,若不存在,则返回默认值 |
| 检查配置中是否包含某个键 |
| 清空所有设置的配置项 |
| 获取所有的配置项,以键-值对的形式返回 |
| 可设置任何有效的 Spark 配置选项 |
①RDD对象
如下图所示,PySpark 支持多种格式的数据输入,并在输入完成后生成一个 RDD 对象。
RDD 的全称是弹性分布式数据集(Resilient Distributed Datasets),它是 PySpark 中数据计算的载体,具备以下功能:
RDD 具有迭代计算特性,RDD的数据计算方法,返回值依旧是RDD对象。
②Python数据容器转RDD对象
在 PySpark 中,可以通过 SparkContext 对象的 parallelize 方法将 list、tuple、set、dict 和 str 转换为 RDD 对象。
parallelize()
:用于将本地集合(即 Python 的原生数据结构)转换为 RDD 对象。
方法签名:
SparkContext.parallelize(collection, numSlices=None)
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize("abcdefg")
rdd4=sc.parallelize({1,2,3,4,5})
rdd5=sc.parallelize({"key1":"value1","key2":"value=2"})
# 使用collect()方法查看RDD里面有什么内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
输出结果: 1, 2, 3, 4, 5 1, 2, 3, 4, 5 'a', 'b', 'c', 'd', 'e', 'f', 'g' 1, 2, 3, 4, 5 'key1', 'key2'
【注意】
③读取文件转RDD对象
在 PySpark 中,可通过 SparkContext 的 textFile 成员方法读取文本文件并生成RDD对象。
textFile()
:用于读取文本文件并将其内容作为 RDD(弹性分布式数据集)加载。
方法签名:textFile(path, minPartitions=None)
例如:电脑D盘中有一个test.txt文本文件,内容如下:
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 使用textFile方法,读取文件数据加载到Spark内,使其成为RDD对象
rdd=sc.textFile("D:/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
输出结果: 'Hello python!', '你好 Python!!!', '123456'
①collect算子
功能:
将分布在集群上的所有 RDD 元素收集到驱动程序(Driver)节点,从而形成一个普通的 Python 列表
用法:
rdd.collect()
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,6])
# collect 算子,输出RDD为List对象
# print(rdd) 输出的是类名,输出结果:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
rdd_list=rdd.collect()
print(rdd_list)
print(type(rdd_list))
sc.stop()
输出结果:
1, 2, 3, 4, 5, 6
<class 'list'>
②reduce算子
功能:
将 RDD 中的元素两两应用指定的聚合函数,最终合并为一个值,适用于需要归约操作的场景。
用法:
rdd.reduce(lambda a, b: a + b)
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda a,b:a+b)
print(num)
sc.stop()
输出结果:
15
【分析】
③take算子
功能:
从 RDD 中获取指定数量的元素,以列表形式返回,同时不会将所有数据传回驱动。如果指定的元素数量超出 RDD 元素数量,则返回所有元素。
用法:
rdd.take(n)
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# take算子,取出RDD前N个元素并组成list返回
take_list=rdd.take(3)
print(take_list)
sc.stop()
输出结果:
1, 2, 3
④count算子
功能:
返回 RDD 中元素的总个数。
用法:
rdd.count()
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# count算子,统计rdd内有多少条数据,返回值为数字
num_count=rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()
输出结果:
rdd内有5个元素
⑤saveAsTextFile算子
功能:
将 RDD 中的数据写入文本文件中。
用法:
rdd.saveAsTextFile(path)
调用保存文件的算子,需配置Hadoop依赖,配置方法如下:
from pyspark import SparkConf, SparkContext
# os用于操作系统级功能,这里用来设置环境变量
import os
# 指定 PySpark 使用的 Python 解释器路径
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
# 指定 Hadoop 的安装目录
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD1,传入numSlices参数为1,数据集划分为一个切片
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
# 准备RDD2,传入numSlices参数为1,数据集划分为一个切片
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)
# 准备RDD3,传入numSlices参数为1,数据集划分为一个切片
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)
# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")
打开output2文本文件,输出结果如下:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。