前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark基础

PySpark基础

原创
作者头像
Heaven645
修改2024-08-13 00:25:40
751
修改2024-08-13 00:25:40
举报
文章被收录于专栏:Python学习

前言

PySpark,作为 Apache Spark 的 Python API,使得处理和分析大数据变得更加高效且易于访问。本章详细讲解了PySpark 的基本概念和架构以及据的输入与输出操作。

一、PySpark入门

①定义

Apache Spark 是一个用于大规模数据处理的统一分析引擎。简单来说,Spark 是一款分布式计算框架,能够调度成百上千的服务器集群,以处理 TB、PB 乃至 EB 级别的海量数据。

作为全球顶级的分布式计算框架,Spark 支持多种编程语言进行开发,其中 Python 语言是 Spark 特别支持的重点方向。

Spark 对 Python 的支持主要体现在第三方库 PySpark 上。PySpark 是由Spark 官方开发的一款 Python 库,允许开发者使用 Python 代码完成 Spark 任务。

PySpark 不仅可以作为独立的 Python 库使用,还能将程序提交到 Spark 集群进行大规模的数据处理。

Python 的应用场景和就业方向相当广泛,其中大数据开发和人工智能是最为突出的方向。

②安装PySpark库

电脑输入Win+R打开运行窗口→在运行窗口输入“cmd”→点击“确定”→输入pip install pyspark

③编程模型

PySpark 的编程流程主要分为以下三个步骤:

准备数据到RDD → RDD迭代计算 → RDD导出为列表、元组、字典、文本文件或数据库等。

  • 数据输入:通过 SparkContext 对象读取数据
  • 数据计算:将读取的数据转换为 RDD 对象,并调用 RDD 的成员方法进行迭代计算
  • 数据输出:通过 RDD 对象的相关方法将结果输出到列表、元组、字典、文本文件或数据库等

④构建PySpark执行环境入口对象

SparkContext是PySpark的入口点,负责与 Spark 集群的连接,并提供了创建 RDD(弹性分布式数据集)的接口。

要使用 PySpark 库完成数据处理,首先需要构建一个执行环境的入口对象,该对象是 SparkContext 类的实例。创建 SparkContext 对象后,便可开始进行数据处理和分析。

代码语言:python
代码运行次数:0
复制
# 导包
# SparkConf:用于配置Spark应用的参数
# SparkContext:用于连接到Spark集群的入口点,负责协调整个Spark应用的运行
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象,用于设置 Spark 程序的配置
# local[*]表示在本地运行Spark
# [*]表示使用系统中的所有可用核心。这适合于开发和测试。
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

SparkConf 类的常用方法:

方法

描述

setMaster(master)

设置 Spark 的运行模式

setAppName(name)

设置 Spark 应用程序的名称,在 Spark UI 中显示

set(key, value)

设置任意的配置参数,通过键-值对的方式设置配置项

setAll(pairs)

批量设置多个配置项,接收包含键-值对的列表或元组

setExecutorEnv(key, value)

设置 executor 的环境变量

get(key, defaultValue=None)

获取指定键的配置值,若不存在,则返回默认值

contains(key)

检查配置中是否包含某个键

clear()

清空所有设置的配置项

getAll()

获取所有的配置项,以键-值对的形式返回

set("spark.some.config.option", "value")

可设置任何有效的 Spark 配置选项

二、数据输入

①RDD对象

如下图所示,PySpark 支持多种格式的数据输入,并在输入完成后生成一个 RDD 对象。

RDD 的全称是弹性分布式数据集(Resilient Distributed Datasets),它是 PySpark 中数据计算的载体,具备以下功能:

  • 提供数据存储
  • 提供数据计算的各类方法

RDD 具有迭代计算特性,RDD的数据计算方法,返回值依旧是RDD对象。

②Python数据容器转RDD对象

在 PySpark 中,可以通过 SparkContext 对象的 parallelize 方法将 list、tuple、set、dict 和 str 转换为 RDD 对象。

parallelize() :用于将本地集合(即 Python 的原生数据结构)转换为 RDD 对象。

方法签名:

SparkContext.parallelize(collection, numSlices=None)

  • 参数collection: 可以是任何可迭代的数据结构(例如list、tuple、set、dict 或 str 的列表)
  • 参数numSlices: 可选参数,用于指定将数据划分为多少个分片
代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize((1,2,3,4,5))
rdd3=sc.parallelize("abcdefg")
rdd4=sc.parallelize({1,2,3,4,5})
rdd5=sc.parallelize({"key1":"value1","key2":"value=2"})

# 使用collect()方法查看RDD里面有什么内容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 1, 2, 3, 4, 5 1, 2, 3, 4, 5 'a', 'b', 'c', 'd', 'e', 'f', 'g' 1, 2, 3, 4, 5 'key1', 'key2'

【注意】

  • 对于字符串,parallelize 方法会将其拆分为单个字符并存入 RDD。
  • 对于字典,只有键会被存入 RDD 对象,值会被忽略。

③读取文件转RDD对象

在 PySpark 中,可通过 SparkContext 的 textFile 成员方法读取文本文件并生成RDD对象。

textFile():用于读取文本文件并将其内容作为 RDD(弹性分布式数据集)加载。

方法签名:textFile(path, minPartitions=None)

  • 参数path:要读取的文件的路径
  • 参数minPartitions:可选参数,用于指定数据划分的最小分片数

例如:电脑D盘中有一个test.txt文本文件,内容如下:

代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 使用textFile方法,读取文件数据加载到Spark内,使其成为RDD对象
rdd=sc.textFile("D:/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

输出结果: 'Hello python!', '你好 Python!!!', '123456'

三、数据输出

①collect算子

功能:

将分布在集群上的所有 RDD 元素收集到驱动程序(Driver)节点,从而形成一个普通的 Python 列表

用法:

rdd.collect()

代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,6])
# collect 算子,输出RDD为List对象
# print(rdd)  输出的是类名,输出结果:ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
rdd_list=rdd.collect()
print(rdd_list)
print(type(rdd_list))
sc.stop()

输出结果:

1, 2, 3, 4, 5, 6

<class 'list'>

②reduce算子

功能:

将 RDD 中的元素两两应用指定的聚合函数,最终合并为一个值,适用于需要归约操作的场景。

用法:

rdd.reduce(lambda a, b: a + b)

代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])

# reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda a,b:a+b)
print(num)
sc.stop()

输出结果:

15

【分析】

③take算子

功能:

从 RDD 中获取指定数量的元素,以列表形式返回,同时不会将所有数据传回驱动。如果指定的元素数量超出 RDD 元素数量,则返回所有元素。

用法:

rdd.take(n)

代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# take算子,取出RDD前N个元素并组成list返回
take_list=rdd.take(3)
print(take_list)
sc.stop()

输出结果:

1, 2, 3

④count算子

功能:

返回 RDD 中元素的总个数。

用法:

rdd.count()

代码语言:python
代码运行次数:0
复制
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象
conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc=SparkContext(conf=conf)
# 准备RDD
rdd=sc.parallelize([1,2,3,4,5,])
# count算子,统计rdd内有多少条数据,返回值为数字
num_count=rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出结果:

rdd内有5个元素

⑤saveAsTextFile算子

功能:

将 RDD 中的数据写入文本文件中。

用法:

rdd.saveAsTextFile(path)

调用保存文件的算子,需配置Hadoop依赖,配置方法如下:

  1. 下载Hadoop安装包: 下载网址http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
  2. 将Hadoop安装包解压到电脑任意位置
  3. 在Python代码中配置os模块: os.environ‘HADOOP_HOME’ = ‘HADOOP解压文件夹路径’
  4. 下载winutils.exe: 下载网址https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
  5. 将winutils.exe放入Hadoop解压文件夹的bin目录内
  6. 下载hadoop.dll: 下载网址https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
  7. 将hadoop.dll放入:C:/Windows/System32 文件夹内
代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
# os用于操作系统级功能,这里用来设置环境变量
import os
# 指定 PySpark 使用的 Python 解释器路径
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
# 指定 Hadoop 的安装目录
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备RDD1,传入numSlices参数为1,数据集划分为一个切片
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)

# 准备RDD2,传入numSlices参数为1,数据集划分为一个切片
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)

# 准备RDD3,传入numSlices参数为1,数据集划分为一个切片
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)

# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

打开output2文本文件,输出结果如下:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、PySpark入门
  • 二、数据输入
  • 三、数据输出
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档