Action(执行):触发Spark作业的运行,真正触发转换算子的计算

Pyspark rdd 常用的转换 Transformation Pyspark(二)

https://www.168seo.cn/pyspark/24806.html

Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客

基本“动作”运算

读取元素

可以使用下列命令读取RDD内的元素,这是Actions运算,所以会马上执行:

输出为:

Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客

统计功能

可以将RDD内的元素进行统计运算:

输出为:
Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客

RDD Key-Value基本“转换”运算

Spark RDD支持键值对运算,Key-Value运算时mapreduce运算的基础,本节介绍RDD键值的基本“转换”运算。

初始化

我们用元素类型为tuple元组的数组初始化我们的RDD,这里,每个tuple的第一个值将作为键,而第二个元素将作为值。
作为值

得到key和value值
可以使用keys和values函数分别得到RDD的键数组和值数组:

输出为:

Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客

筛选元素

可以按照键进行元素筛选,也可以通过值进行元素筛选,和之前的一样,使用filter函数,这里要注意的是,虽然RDD中是以键值对形式存在,但是本质上还是一个二元组,二元组的第一个值代表键,第二个值代表值,所以按照如下的代码既可以按照键进行筛选,我们筛选键值小于5的数据:

输出为:

同样,将x[0]替换为x[1]就是按照值进行筛选,我们筛选值小于5的数据:

输出为:

值运算

我们可以使用mapValues方法处理value值,下面的代码将value值进行了平方处理:

输出为:

按照key排序

可以使用sortByKey按照key进行排序,传入参数的默认值为true,是按照从小到大排序,也可以传入参数false,表示从大到小排序:

输出为:

合并相同key值的数据

使用reduceByKey函数可以对具有相同key值的数据进行合并。比如下面的代码,由于RDD中存在(3,4)和(3,6)两条key值均为3的数据,他们将被合为一条数据:

输出为

多个RDD Key-Value“转换”运算

初始化
首先我们初始化两个k-v的RDD:

内连接运算

join运算可以实现类似数据库的内连接,将两个RDD按照相同的key值join起来,kvRDD1与kvRDD2的key值唯一相同的是3,kvRDD1中有两条key值为3的数据(3,4)和(3,6),而kvRDD2中只有一条key值为3的数据(3,8),所以join的结果是(3,(4,8)) 和(3,(6,8)):

输出为:

左外连接

使用leftOuterJoin可以实现类似数据库的左外连接,如果kvRDD1的key值对应不到kvRDD2,就会显示None

输出为:

右外连接
使用rightOuterJoin可以实现类似数据库的右外连接,如果kvRDD2的key值对应不到kvRDD1,就会显示None

输出为:

删除相同key值数据

使用subtractByKey运算会删除相同key值得数据:

结果为:

Key-Value“动作”运算

读取数据
可以使用下面的几种方式读取RDD的数据:

输出为:

按key值统计:

使用countByKey函数可以统计各个key值对应的数据的条数:

输出为:

lookup查找运算

使用lookup函数可以根据输入的key值来查找对应的Value值:

输出为:

持久化操作

spark RDD的持久化机制,可以将需要重复运算的RDD存储在内存中,以便大幅提升运算效率,有两个主要的函数:

持久化

使用persist函数对RDD进行持久化:

在持久化的同时我们可以指定持久化存储等级:

Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客

首先我们导入相关函数:

在scala中可以直接使用上述的持久化等级关键词,但是在pyspark中封装为了一个类,
StorageLevel类,并在初始化时指定一些参数,通过不同的参数组合,可以实现上面的不同存储等级。StorageLevel类的初始化函数如下:

那么不同的存储等级对应的参数为:

取消持久化

使用unpersist函数对RDD进行持久化:

整理回顾
哇,有关pyspark的RDD的基本操作就是上面这些啦,想要了解更多的盆友们可以参照官网给出的官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

今天主要介绍了两种RDD,基本的RDD和Key-Value形式的RDD,介绍了他们的几种“转换”运算和“动作”运算,整理如下:
Pyspark rdd 常用的执行动作 Action  Pyspark(三)-新乡seo|网站优化,网站建设_微信公众号:zeropython—昊天博客