在大数据处理的时代,Apache Spark以其高效的数据处理能力和灵活的编程模型,成为了数据科学家和工程师的热门选择。PySpark作为Spark的Python接口,使得数据处理和分析更加直观和便捷。本文详细讲解了PySpark中的常用RDD算子,包括map、flatMap、reduceByKey、filter、distinct和sortBy。
在 PySpark 中,所有的数据计算都是基于 RDD(弹性分布式数据集)对象进行的。RDD 提供了丰富的成员方法(算子)来执行各种数据处理操作。
定义:
map
算子会对RDD中的每个元素应用一个用户定义的函数,并返回一个新的 RDD。
语法:
new_rdd = rdd.map(func)
参数func
为一个函数,该函数接受单个输入参数,并返回一个输出值,其函数表示法为f:(T) → U
import os
from pyspark import SparkConf, SparkContext
# os.environ['PYSPARK_PYTHON'] =“自己电脑Python.exe的安装路径”,用于指定Python解释器
os.environ['PYSPARK_PYTHON'] = "D:Study\Paython\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
return data * 10
print(rdd2.collect())
输出结果: 10,20,30,40,50
【分析】
rdd.map(func) 创建一个新的RDD对象rdd2,其中每个元素都会通过map
算子应用函数 func
。因此,原始 RDD 中的每个元素(1, 2, 3, 4, 5)都会依次被传入 func
函数并处理:
func(1) 产生 10
func(2) 产生 20
func(3) 产生 30
func(4) 产生 40
func(5) 产生 50
结果是新的RD 对象rdd2 ,包含的元素为 10, 20, 30, 40, 50。
【拓展】
链式调用:在编程中将多个方法或函数的调用串联在一起的方式。
在 PySpark 中,链式调用非常常见,通常用于对 RDD 进行一系列变换或操作。通过链式调用,开发者可以在一条语句中连续执行多个操作,不需要将每个操作的结果存储在一个中间变量中,从而提高代码的简洁性和可读性。
例如:
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10,然后都加上5
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
输出结果: 15, 25, 35, 45, 55
【分析】
第一个map
算子接收一个 lambda 函数,这个函数将传入的每个元素乘以 10;第二个map
算子在第一个map
的结果上再次调用新的 lambda 函数,每个元素再加上 5。处理后的结果为:10 + 5, 20 + 5, 30 + 5, 40 + 5, 50 + 5,即 15, 25, 35, 45, 55。
定义:
flatMap
算子将输入RDD中的每个元素映射到一个序列,然后将所有序列扁平化为一个单独的RDD。简单来说,就是对rdd执行map操作,然后进行解除嵌套操作。
语法:
new_rdd = rdd.flatMap(func)
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["hi python","Hello world","Happy day"])
# 需求将RDD数据里面的单词一个个提取出来
rdd2=rdd.map(lambda x:x.split(" "))
print(rdd2.collect())
sc.stop()
输出结果:
["hi python","Hello world","Happy day"]
【分析】
map
算子执行过程如下:
对于第一个元素 "hi python",通过 split(" ")得到的结果是 "hi", "python";
对于第二个元素 "Hello world",通过 split(" ")得到的结果是 "Hello", "world";
对于第三个元素 "Happy day",通过 split(" ")得到的结果是 "Happy", "day";
显而易见,输出的结果不满足我们的需求,我们运用flatMap
算子,将rdd2=rdd.map(lambda x:x.split(" "))
改为如下代码后
rdd2=rdd.flatmap(lambda x:x.split(" "))
输出结果:
'hi', 'python', 'Hello', 'world', 'Happy', 'day'
flatMap
算子会将结果扁平化为单一列表,适合于需要展开嵌套结构的场景。
定义:
reduceByKey
算子用于将具有相同键的值进行合并,并通过指定的聚合函数生成一个新的键值对 RDD。
语法:
new_rdd = rdd.reduceByKey(func)
参数func
是一个用于合并两个相同键的值的函数,其接收两个相同类型的参数并返回一个相同类型的值,其函数表示法为f:(V,V)→>V
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# reduceByKey算子
rdd=sc.parallelize([('男',99),('男',88),('女',99),('女',66)])
# 求男生和女生两个组的成绩之和
rdd2=rdd.reduceByKey(lambda a,b:a+b)
print(rdd2.collect())
sc.stop()
输出结果:
('男',187), ('女',165)
【分析】
reduceByKey
算子根据每个不同的键调用匿名函数 lambda a, b: a + b,将其接受两个参数相加。
对于键 '男':
首先处理到的值是 99,然后是 88;
使用 lambda a, b: a + b,即 99 + 88 = 187。
对于键 '女':
首先处理到的值是 99,然后是 66;
使用 lambda a, b: a + b,即 99 + 66 = 165。
定义:
filter
算子根据给定的布尔函数过滤RDD中的元素,返回一个只包含满足条件的元素的新RDD。
语法:
new_rdd = rdd.filter(func)
参数func
是一个函数,用于接收 RDD 中的每个元素,并返回一个布尔值(True 或 False)。
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# filter算子
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 过滤RDD数据中的奇数,仅保留偶数
rdd2=rdd.filter(lambda num:num%2==0)
print(rdd2.collect())
sc.stop()
输出结果:
2, 4
定义:
distinct
算子对RDD数据进行去重,返回一个新的RDD。
语法:
new_rdd = rdd.distinct()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# distinct算子
rdd = sc.parallelize([1, 2, 2, 5, 5, 6])
# 对RDD数据进行去重
rdd2=rdd.distinct()
print(rdd2.collect())
sc.stop()
输出结果:
1, 2, 5, 6
定义:
sortBy
算子根据指定的键对元素进行排序。
语法:
new_rdd = rdd.sortBy(func, ascending=True, numPartitions=None)
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\桌面\Study\Paython\learning\pythonProject\.venv\Scripts\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 创建了一个包含四个元组的 RDD
rdd=sc.parallelize([('小明',99),('小红',88),('小城',99),('小李',66)])
# 使用 sortBy 方法将 RDD 按照分数(元组中的第二个元素)进行降序排序
rdd2=rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(rdd2.collect())
sc.stop()
输出结果:
('小明', 99), ('小城', 99), ('小红', 88), ('小李', 66)
【注意】
如果多个元素具有相同的键(如这里的 99),sortBy
算子会保持这些元素在原始 RDD 中的相对顺序(稳定排序)。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。