spark.createDataFrame(rowRdd)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
from __future__ import print_function import sys import json from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.sql import Row, SparkSession def getSparkSessionInstance(sparkConf): # 生成单例模式的getSparkSessionInstance if ('sparkSessionSingletonInstance' not in globals()): globals()['sparkSessionSingletonInstance'] = SparkSession .builder .config(conf=sparkConf) .getOrCreate() return globals()['sparkSessionSingletonInstance'] sc = SparkContext("local[2]","NetWordCount") ssc = StreamingContext(sc,1) topic = "connect-test" # kafka 中的 topic kvs = KafkaUtils.createStream(ssc,"localhost:2181","spark-streaming-consumer",{topic:1}) # kafka 读取数据 words = kvs.map(lambda x:x[1]) # words = kvs.map(lambda line: line.split(",")) # words = kvs.flatMap(lambda line: line.split(" ")) # Convert RDDs of the words DStream to DataFrame and run SQL query def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowSplitted = rdd.flatMap(lambda line: line.split(",")) rowRdd = rowSplitted.map(lambda w: Row(word=w)) # rowRdd = rdd.flatMap(lambda line: line.split(",")) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame. wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass words.pprint() words.foreachRDD(process) # foreachRDD Apply a function to each RDD in this DStream(流处理). ssc.start() ssc.awaitTermination() |
toDF() 转成 DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
>>> from pyspark.sql.types import Row >>> def f(x): ... rel = {} ... rel['name'] = x[0] ... rel['age'] = x[1] ... return rel ... >>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF() >>> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用 >>> personsDF = spark.sql("select * from people") >>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print) Name: 19,Age:Justin Name: 29,Age:Michael Name: 30,Age:Andy |

