Hello,
I am trying to set up a simple example with Spark Streaming (Python) and
Kafka on a single machine deployment.
My Kafka broker/server is also on the same machine (localhost:1281) and
I am using Spark Version: spark-1.5.2-bin-hadoop2.6
Python code
...
ssc = StreamingContext(sc, 1)
...
lines = KafkaUtils.createDirectStream(ssc, ["test"],
{"metadata.broker.list":"localhost:1281"})
So I try
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
my_kafka_streaming_wordcount.py
OR
spark-submit --packages
org.apache.spark:spark-streaming-kafka_2.11:1.5.2
my_kafka_streaming_wordcount.py
(my kafka version is 2.11-0.9.0.0)
OR
pyspark --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar ....
[import stuff and type those lines]
and I end up with:
15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java
classes where applicable
15/12/17 19:45:00 WARN MetricsSystem: Using default name
DAGScheduler for source because spark.app.id is not set.
Traceback (most recent call last):
File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py",
line 80, in <module>
lines = KafkaUtils.createDirectStream(ssc, ["test"],
{"metadata.broker.list":"localhost:1281"})
File
"/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
line 130, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling
o29.createDirectStream.
: java.lang.NoSuchMethodError:
scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
at
kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Am I missing something?
Thanks in advance
Chris M.