Contents
Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作
spark 的动作运算
https://www.168seo.cn/pyspark/24809.html
map
将函数作用于数据集的每一个元素上,生成一个分布式的数据集返回
1 2 3 4 5 6 |
Return a new RDD by applying a function to each element of this RDD. >>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.map(lambda x:x+1) print(r2.collect()) sc.stop() |
结果是:

filter
返回所有 funtion 返回值为True的函数,生成一个分布式的数据集返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Return a new RDD containing only the elements that satisfy a predicate. >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4] from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = range(10) print(list(data)) r1 = sc.parallelize(data) r2 = r1.filter(lambda x:x>5) print(r2.collect()) sc.stop() |

flatMap
Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")) r3 = r1.map(lambda x:x.split(" ")) print(r2.collect()) print(r3.collect()) sc.stop() |

groupBykey
按照相同key的数据分成一组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# -*- coding: utf-8 -*- """ @Time: 2018/9/5 @Author: songhao @微信公众号: zeropython @File: saprk_map.py """ from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) sc.setLogLevel("WARN") """ Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. """ data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda y:(y,1)) print("r2",r2.collect()) r3 = r2.groupByKey() print("r3",r3.collect()) r4 = r3.map(lambda x:{x[0]:list(x[1])}) print("r4",r4.collect()) print(r2.reduceByKey(add).collect()) sc.stop() |

groupBy运算
groupBy运算可以按照传入匿名函数的规则,将数据分为多个Array。比如下面的代码将intRDD分为偶数和奇数:
1 2 3 |
result = intRDD.groupBy(lambda x : x % 2).collect() print (sorted([(x, sorted(y)) for (x, y) in result])) |
输出为:
1 2 |
[(0, [2]), (1, [1, 3, 5, 5])] |
reduceBykey
把相同的key 的数据分发到一起 并进行运算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# -*- coding: utf-8 -*- """ @Time: 2018/9/5 @Author: songhao @微信公众号: zeropython @File: saprk_map.py """ from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) data = ["hello zeropython","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)) print("r2",r2.collect()) r3 = r2.reduceByKey(lambda x,y:x+y) print("r3",r3.collect()) sc.stop() |

sortbykey
1 2 |
Sorts this RDD, which is assumed to consist of (key, value) pairs. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# -*- coding: utf-8 -*- """ @Time: 2018/9/5 @Author: songhao @微信公众号: zeropython @File: saprk_map.py """ from _operator import add from pyspark import SparkConf,SparkContext #配置 conf = SparkConf() #.setAppName("spark demo ").setMaster("local[2]") sc = SparkContext(conf=conf) # sc.setLogLevel("FATAL") # sc.setLogLevel("ERROR") sc.setLogLevel("ERROR") data = ["hello zeropython","hwlldsf world","168seo.cn","168seo.cn","hello 168seo.cn"] # print(list(data)) r1 = sc.parallelize(data) r2 = r1.flatMap(lambda x:x.split(" "))\ .map(lambda y:(y,1))\ .reduceByKey(lambda x,y:x+y)\ .sortByKey(lambda x:x[1]) # sortByKey排序根据关键词的值进行排序 # reduceByKey 让[("a",[1,1,1,1])] 转换成 [("a",3)] print(r2.collect()) sc.stop() |

union
1 2 3 4 5 6 7 8 |
""" Return the union of this RDD and another one. >>> rdd = sc.parallelize([1, 1, 2, 3]) >>> rdd.union(rdd).collect() [1, 1, 2, 3, 1, 1, 2, 3] """ |
distinct
1 2 3 4 5 6 7 |
""" Return a new RDD containing the distinct elements in this RDD. >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """ |
join
1 2 3 4 5 |
>>> a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")]) >>> b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")]) >>> a.join(b).collect() [('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
leftOuterJoin
1 2 3 |
>>> a.leftOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
rightOuterJoin
1 2 3 |
>>> a.rightOuterJoin(b).collect() [('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] |
fullOuterJoin
1 2 3 4 |
>>> a.fullOuterJoin(b).collect() [('F', ('f1', None)), ('F', ('f2', None)), ('D', ('d1', None)), ('E', (None, 'e1')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('A', ('a1', 'a2'))] >>> |
randomSplit运算
randomSplit 运算将整个集合以随机数的方式按照比例分为多个RDD,比如按照0.4和0.6的比例将intRDD分为两个RDD,并输出:
1 2 3 4 5 6 7 |
intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple']) sRDD = intRDD.randomSplit([0.4,0.6]) print (len(sRDD)) print (sRDD[0].collect()) print (sRDD[1].collect()) |
输出为:
1 2 3 4 |
2 [3, 1] [2, 5, 5] |
多个RDD转换运算
RDD也支持执行多个RDD的运算,这里,我们定义三个RDD:
1 2 3 4 |
intRDD1 = sc.parallelize([3,1,2,5,5]) intRDD2 = sc.parallelize([5,6]) intRDD3 = sc.parallelize([2,7]) |
并集运算
可以使用union函数进行并集运算:
1 2 |
print (intRDD1.union(intRDD2).union(intRDD3).collect()) |
输出为:
1 2 |
[3, 1, 2, 5, 5, 5, 6, 2, 7] |
交集运算
可以使用intersection进行交集运算:
1 2 |
print(intRDD1.intersection(intRDD2).collect()) |
两个集合中只有一个相同元素5,所以输出为:
1 2 |
[5] |
差集运算
subtract(减去 去除)
可以使用subtract函数进行差集运算:
1 2 |
print (intRDD1.subtract(intRDD2).collect()) |
由于两个RDD的重复部分为5,所以输出为[1,2,3]:
1 2 |
[2, 1, 3] |
笛卡尔积运算
笛卡尔乘积是指在数学中,两个集合X和Y的笛卡尓积(Cartesian product),又称直积,表示为X × Y,第一个对象是X的成员而第二个对象是Y的所有可能有序对的其中一个成员
笛卡尔积又叫笛卡尔乘积,是一个叫笛卡尔的人提出来的。 简单的说就是两个集合相乘的结果。
假设集合A={a, b},集合B={0, 1, 2},则两个集合的笛卡尔积为{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
可以使用cartesian函数进行笛卡尔乘积运算:
1 2 |
print (intRDD1.cartesian(intRDD2).collect()) |
由于两个RDD分别有5个元素和2个元素,所以返回结果有10各元素:
1 2 |
[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)] |

