SparkStreaming下Python报net.jpountz.lz4.LZ4BlockInputStream的解决
发布时间:2018-09-12T15:00:03:手机请访问
SparkStreaming下Python报net.jpountz.lz4.LZ4BlockInputStream的解决
这几天在测试SparkStreaming,连接Kafka一直报这个错,
1 2 3 4 5 6 7 |
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122) at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163) at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124) at org.apache.spark.shuffle.BlockStoreShuffleRjareader$$anonfun$2.apply(BlockStoreShuffleReader.scala:50) |
连接同事用java写进去的都OK,用client端或NiFi都抱着个错.
开始一直怀疑jar包版本的问题, 用了最新的spark-streaming-kafka-0-8-assembly_2.10-2.2.2.jar 也是出错.
最后查到这个文档,设置这个参数就可以了:
1 2 |
.config("spark.io.compression.codec", "snappy") |
修改了wordcount例子就可以了:
1 2 3 4 5 6 7 8 9 10 11 12 |
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) exit(-1) #sc = SparkContext(appName="PythonStreamingKafkaWordCount") conf = SparkConf().setAppName("PythonStreamingKafkaWordCount").set('spark.io.compression.codec','snappy') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 20) |
调用命令:
1 2 |
bin/spark-submit --jars /pythontest/scripts/spark-streaming-kafka-0-8-assembly_2.10-2.2.2.jar examples/src/main/python/streaming/kafka_wordcount30.py testnode:2181 sengtest |
最后终于解决了

