lines = sc.textFile("word.txt")
pairRDD = lines.flatMap(lambda line: line.split(" ")) \ # 将数据先进行分割split,再拍平flat,形成单个的元素
.map(lambda word:(word, 1)) # 单个元素和1组成元组的形式,键值对RDD
pairRDD.foreach(print)
("hadoop", 1)
("spark", 1)
("hive", 1)
reduceByKey(func)
先通过key
进行分组,再通过value
进行func
函数的运用
pairRDD = sc.parallelize([("hadoop",1),("hive",1),("spark", 1), ("spark", 1)])
pairRDD.reduceByKey(lambda a,b: a+b).foreach(print) # 先通过key进行分组,再通过value进行func函数的运用
("spark",2)
("hive",1)
("hadoop",1)
groupByKey()
不进行聚合运算,只是将相同键的值进行合并成一个列表的形式。
pairRDD = sc.parallelize([("hadoop",1),("hive",1),("spark", 1), ("spark", 1)])
pairRDD.reduceByKey(lambda a,b: a+b).foreach(print) # 先通过key进行分组,再通过value进行func函数的运用
("spark",(1, 1)) # 不进行计算,只合并成一个元组形式
("hive",1)
("hadoop",1)
reduceByKey
可以等价于groupByKey
加上map
操作:
# 1
words = ["one","two","two", "three", "three", "three"]
wordpairRDD = sc.parallelize(words).map(lambda word:(word,1))
wordCountReduce = wordpairRDD.reduceByKey(lambda a.b:a+b)
wordCountReduce.foreach(print)
# 2
wordcountGroup = wordpairRDD.groupByKey().map(lambda t:(t[0], sum(t[1])))
wordCountGroup.foreach(print)
("one",1)
("two",2)
("three",3)
keys()/vlaues()
只取出key/value的值
pairRDD = sc.parallelize([("hadoop",1),("hive",1),("spark", 1), ("spark", 1)])
pairRDD.keys().foreach(print)
sortBykey()
对key
进行排序,默认是升序False
mapValues(func)
key
保持不变,值根据func
函数进行改变
pairRDD = sc.parallelize([("hadoop",1),("hive",1),("spark", 1), ("spark", 1)])
pairRDD1 = pairRDD.mapValues(lambda x:x+1) # 将每个值进行加1操作
pairRDD1.foreach(print)
("hadoop",2)
("hive",2)
("spark",2)
("spark",2)
join
如果两个RDD
的key
相同,将它们的value
进行合并,形成一个元组的形式。
mapValues
:只对value
进行操作,而且没有聚合操作reduceByKey(func)
:先分组,再对value
进行函数func
的聚合操作