Action(执行):触发Spark作业的运行,真正触发转换算子的计算
Pyspark rdd 常用的转换 Transformation Pyspark(二)
https://www.168seo.cn/pyspark/24806.html

1 2 3 |
intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple']) |
基本“动作”运算
读取元素
可以使用下列命令读取RDD内的元素,这是Actions运算,所以会马上执行:
1 2 3 4 5 6 7 8 9 |
#取第一条数据 print (intRDD.first()) #取前两条数据 print (intRDD.take(2)) #升序排列,并取前3条数据 print (intRDD.takeOrdered(3)) #降序排列,并取前3条数据 print (intRDD.takeOrdered(3,lambda x:-x)) |
输出为:
1 2 3 4 5 |
3 [3, 1] [1, 2, 3] [5, 5, 3] |

统计功能
可以将RDD内的元素进行统计运算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#统计 print (intRDD.stats()) #最小值 print (intRDD.min()) #最大值 print (intRDD.max()) #标准差 print (intRDD.stdev()) #计数 print (intRDD.count()) #求和 print (intRDD.sum()) #平均 print (intRDD.mean()) |
输出为:
RDD Key-Value基本“转换”运算
Spark RDD支持键值对运算,Key-Value运算时mapreduce运算的基础,本节介绍RDD键值的基本“转换”运算。
初始化
我们用元素类型为tuple元组的数组初始化我们的RDD,这里,每个tuple的第一个值将作为键,而第二个元素将作为值。
作为值
1 2 |
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) |
得到key和value值
可以使用keys和values函数分别得到RDD的键数组和值数组:
1 2 3 |
print (kvRDD1.keys().collect()) print (kvRDD1.values().collect()) |
输出为:

筛选元素
可以按照键进行元素筛选,也可以通过值进行元素筛选,和之前的一样,使用filter函数,这里要注意的是,虽然RDD中是以键值对形式存在,但是本质上还是一个二元组,二元组的第一个值代表键,第二个值代表值,所以按照如下的代码既可以按照键进行筛选,我们筛选键值小于5的数据:
1 2 |
print (kvRDD1.filter(lambda x:x[0] < 5).collect()) |
输出为:
1 2 |
[(3, 4), (3, 6), (1, 2)] |
同样,将x[0]替换为x[1]就是按照值进行筛选,我们筛选值小于5的数据:
1 2 |
print (kvRDD1.filter(lambda x:x[1] < 5).collect()) |
输出为:
1 2 |
[(3, 4), (1, 2)] |
值运算
我们可以使用mapValues方法处理value值,下面的代码将value值进行了平方处理:
1 2 |
print (kvRDD1.mapValues(lambda x:x**2).collect()) |
输出为:
1 2 |
[(3, 16), (3, 36), (5, 36), (1, 4)] |
按照key排序
可以使用sortByKey按照key进行排序,传入参数的默认值为true,是按照从小到大排序,也可以传入参数false,表示从大到小排序:
1 2 3 4 |
print (kvRDD1.sortByKey().collect()) print (kvRDD1.sortByKey(True).collect()) print (kvRDD1.sortByKey(False).collect()) |
输出为:
1 2 3 4 |
[(1, 2), (3, 4), (3, 6), (5, 6)] [(1, 2), (3, 4), (3, 6), (5, 6)] [(5, 6), (3, 4), (3, 6), (1, 2)] |
合并相同key值的数据
使用reduceByKey函数可以对具有相同key值的数据进行合并。比如下面的代码,由于RDD中存在(3,4)和(3,6)两条key值均为3的数据,他们将被合为一条数据:
1 2 |
print (kvRDD1.reduceByKey(lambda x,y:x+y).collect()) |
输出为
1 2 |
[(1, 2), (3, 10), (5, 6)] |
多个RDD Key-Value“转换”运算
初始化
首先我们初始化两个k-v的RDD:
1 2 3 |
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2 = sc.parallelize([(3,8)]) |
内连接运算
join运算可以实现类似数据库的内连接,将两个RDD按照相同的key值join起来,kvRDD1与kvRDD2的key值唯一相同的是3,kvRDD1中有两条key值为3的数据(3,4)和(3,6),而kvRDD2中只有一条key值为3的数据(3,8),所以join的结果是(3,(4,8)) 和(3,(6,8)):
1 2 |
print (kvRDD1.join(kvRDD2).collect()) |
输出为:
1 2 |
[(3, (4, 8)), (3, (6, 8))] |
左外连接
使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示None
1 2 |
print (kvRDD1.leftOuterJoin(kvRDD2).collect()) |
输出为:
1 2 |
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))] |
右外连接
使用rightOuterJoin可以实现类似数据库的右外连接,如果kvRDD2的key值对应不到kvRDD1,就会显示None
1 2 |
print (kvRDD1.rightOuterJoin(kvRDD2).collect()) |
输出为:
1 2 |
[(3, (4, 8)), (3, (6, 8))] |
删除相同key值数据
使用subtractByKey运算会删除相同key值得数据:
1 2 |
print (kvRDD1.subtractByKey(kvRDD2).collect()) |
结果为:
1 2 |
[(1, 2), (5, 6)] |
Key-Value“动作”运算
读取数据
可以使用下面的几种方式读取RDD的数据:
1 2 3 4 5 6 7 8 9 |
#读取第一条数据 print (kvRDD1.first()) #读取前两条数据 print (kvRDD1.take(2)) #读取第一条数据的key值 print (kvRDD1.first()[0]) #读取第一条数据的value值 print (kvRDD1.first()[1]) |
输出为:
1 2 3 4 5 |
(3, 4) [(3, 4), (3, 6)] 3 4 |
按key值统计:
使用countByKey函数可以统计各个key值对应的数据的条数:
1 2 |
print (kvRDD1.countByKey().collect()) |
输出为:
1 2 |
defaultdict(<type 'int'>, {1: 1, 3: 2, 5: 1}) |
lookup查找运算
使用lookup函数可以根据输入的key值来查找对应的Value值:
1 2 |
print (kvRDD1.lookup(3)) |
输出为:
1 2 |
[4, 6] |
持久化操作
spark RDD的持久化机制,可以将需要重复运算的RDD存储在内存中,以便大幅提升运算效率,有两个主要的函数:
持久化
使用persist函数对RDD进行持久化:
1 2 |
kvRDD1.persist() |
在持久化的同时我们可以指定持久化存储等级:

首先我们导入相关函数:
1 2 |
在scala中可以直接使用上述的持久化等级关键词,但是在pyspark中封装为了一个类,
StorageLevel类,并在初始化时指定一些参数,通过不同的参数组合,可以实现上面的不同存储等级。StorageLevel类的初始化函数如下:
1 2 3 4 5 6 7 |
def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1): self.useDisk = useDisk self.useMemory = useMemory self.useOffHeap = useOffHeap self.deserialized = deserialized self.replication = replication |
那么不同的存储等级对应的参数为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1) """ .. note:: The following four storage level constants are deprecated in 2.0, since the records \ will always be serialized in Python. """ StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead.""" StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead.""" StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead.""" StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2 """.. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead.""" |
取消持久化
使用unpersist函数对RDD进行持久化:
1 2 |
kvRDD1.unpersist() |
整理回顾
哇,有关pyspark的RDD的基本操作就是上面这些啦,想要了解更多的盆友们可以参照官网给出的官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
今天主要介绍了两种RDD,基本的RDD和Key-Value形式的RDD,介绍了他们的几种“转换”运算和“动作”运算,整理如下:

