前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark数据计算

PySpark数据计算

原创
作者头像
Heaven645
修改2024-08-14 00:50:28
1260
修改2024-08-14 00:50:28
举报
文章被收录于专栏:Python学习

前言

在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。

在 PySpark 中,所有的数据计算都是基于 RDD(弹性分布式数据集)对象进行的。RDD 提供了丰富的成员方法(算子)来执行各种数据处理操作。

一、map算子

定义:

map算子会对RDD中的每个元素应用一个用户定义的函数,并返回一个新的 RDD。

语法:

new_rdd = rdd.map(func)

参数func为一个函数,该函数接受单个输入参数,并返回一个输出值,其函数表示法为f:(T) → U

  • f:表示这是一个函数(方法)
  • T:表示传入参数的类型,可以是任意类型
  • U:表示返回值的类型,可以是任意类型
  • (T)-U:表示该方法接受一个参数(类型为 T),返回值的类型为 U
代码语言:python
代码运行次数:0
复制
import os
from pyspark import SparkConf, SparkContext
# os.environ['PYSPARK_PYTHON'] =“自己电脑Python.exe的安装路径”,用于指定Python解释器
os.environ['PYSPARK_PYTHON'] = "D:Study\Paython\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
    return data * 10
    
print(rdd2.collect())

输出结果: 10,20,30,40,50

【分析】

rdd.map(func) 创建一个新的RDD对象rdd2,其中每个元素都会通过map算子应用函数 func。因此,原始 RDD 中的每个元素(1, 2, 3, 4, 5)都会依次被传入 func 函数并处理:

func(1) 产生 10

func(2) 产生 20

func(3) 产生 30

func(4) 产生 40

func(5) 产生 50

结果是新的RD 对象rdd2 ,包含的元素为 10, 20, 30, 40, 50。

【拓展】

链式调用:在编程中将多个方法或函数的调用串联在一起的方式。

在 PySpark 中,链式调用非常常见,通常用于对 RDD 进行一系列变换或操作。通过链式调用,开发者可以在一条语句中连续执行多个操作,不需要将每个操作的结果存储在一个中间变量中,从而提高代码的简洁性和可读性。

例如:

代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10,然后都加上5
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())

输出结果: 15, 25, 35, 45, 55

【分析】

第一个map算子接收一个 lambda 函数,这个函数将传入的每个元素乘以 10;第二个map算子在第一个map的结果上再次调用新的 lambda 函数,每个元素再加上 5。处理后的结果为:10 + 5, 20 + 5, 30 + 5, 40 + 5, 50 + 5,即 15, 25, 35, 45, 55。

二、flatMap算子

定义:

flatMap算子将输入RDD中的每个元素映射到一个序列,然后将所有序列扁平化为一个单独的RDD。简单来说,就是对rdd执行map操作,然后进行解除嵌套操作。

语法:

new_rdd = rdd.flatMap(func)

代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hi python","Hello world","Happy day"])
# 需求将RDD数据里面的单词一个个提取出来
rdd2=rdd.map(lambda  x:x.split(" "))
print(rdd2.collect())
sc.stop()

输出结果:

["hi python","Hello world","Happy day"]

【分析】

map算子执行过程如下:

对于第一个元素 "hi python",通过 split(" ")得到的结果是 "hi", "python";

对于第二个元素 "Hello world",通过 split(" ")得到的结果是 "Hello", "world";

对于第三个元素 "Happy day",通过 split(" ")得到的结果是 "Happy", "day";

显而易见,输出的结果不满足我们的需求,我们运用flatMap算子,将rdd2=rdd.map(lambda x:x.split(" "))改为如下代码后

代码语言:python
代码运行次数:0
复制
rdd2=rdd.flatmap(lambda  x:x.split(" "))

输出结果:

'hi', 'python', 'Hello', 'world', 'Happy', 'day'

flatMap算子会将结果扁平化为单一列表,适合于需要展开嵌套结构的场景。

三、reduceByKey算子

定义:

reduceByKey算子用于将具有相同键的值进行合并,并通过指定的聚合函数生成一个新的键值对 RDD。

语法:

new_rdd = rdd.reduceByKey(func)

参数func是一个用于合并两个相同键的值的函数,其接收两个相同类型的参数并返回一个相同类型的值,其函数表示法为f:(V,V)→>V

  • f: 函数的名称或标识符
  • (V, V):表示函数接收两个相同类型的参数
  • → V:表示函数的返回值类型
代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# reduceByKey算子
rdd=sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生两个组的成绩之和
rdd2=rdd.reduceByKey(lambda a,b:a+b)
print(rdd2.collect())
sc.stop()

输出结果:

('男',187), ('女',165)

【分析】

reduceByKey算子根据每个不同的键调用匿名函数 lambda a, b: a + b,将其接受两个参数相加。

对于键 '男':

首先处理到的值是 99,然后是 88;

使用 lambda a, b: a + b,即 99 + 88 = 187。

对于键 '女':

首先处理到的值是 99,然后是 66;

使用 lambda a, b: a + b,即 99 + 66 = 165。

四、filter算子

定义:

filter算子根据给定的布尔函数过滤RDD中的元素,返回一个只包含满足条件的元素的新RDD。

语法:

new_rdd = rdd.filter(func)

参数func是一个函数,用于接收 RDD 中的每个元素,并返回一个布尔值(True 或 False)。

  • 如果返回 True,则该元素会被保留在新 RDD 中
  • 如果返回 False,则该元素会被过滤掉
代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# filter算子
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 过滤RDD数据中的奇数,仅保留偶数
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
sc.stop()

输出结果:

2, 4

五、distinct算子

定义:

distinct算子对RDD数据进行去重,返回一个新的RDD。

语法:

new_rdd = rdd.distinct()

代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# distinct算子
rdd = sc.parallelize([1, 2, 2, 5, 5, 6])
# 对RDD数据进行去重
rdd2=rdd.distinct()
print(rdd2.collect())
sc.stop()

输出结果:

1, 2, 5, 6

六、sortBy算子

定义:

sortBy算子根据指定的键对元素进行排序。

语法:

new_rdd = rdd.sortBy(func, ascending=True, numPartitions=None)

  • 参数:func:用于指定排序依据的函数
  • 参数ascending:指定排序的顺序,True 表示升序排序(默认值);False 表示降序排序
  • 参数numPartitions:可选参数,指定分区数
代码语言:python
代码运行次数:0
复制
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 创建了一个包含四个元组的 RDD
rdd=sc.parallelize([('小明',99),('小红',88),('小城',99),('小李',66)])
# 使用 sortBy 方法将 RDD 按照分数(元组中的第二个元素)进行降序排序
rdd2=rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(rdd2.collect())
sc.stop()

输出结果:

('小明', 99), ('小城', 99), ('小红', 88), ('小李', 66)

【注意】

如果多个元素具有相同的键(如这里的 99),sortBy算子会保持这些元素在原始 RDD 中的相对顺序(稳定排序)。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、map算子
  • 二、flatMap算子
  • 三、reduceByKey算子
  • 四、filter算子
  • 五、distinct算子
  • 六、sortBy算子
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档