Hello,

I am trying to set up a simple example with Spark Streaming (Python) and Kafka on a single machine deployment. My Kafka broker/server is also on the same machine (localhost:1281) and I am using Spark Version: spark-1.5.2-bin-hadoop2.6

Python code

   ...
   ssc = StreamingContext(sc, 1)
   ...
   lines = KafkaUtils.createDirectStream(ssc, ["test"],
   {"metadata.broker.list":"localhost:1281"})


So I try

   spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
   my_kafka_streaming_wordcount.py

OR

   spark-submit --packages
   org.apache.spark:spark-streaming-kafka_2.11:1.5.2
   my_kafka_streaming_wordcount.py
   (my kafka version is 2.11-0.9.0.0)

OR

   pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar ....
   [import stuff and type those lines]


and I end up with:

   15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load
   native-hadoop library for your platform... using builtin-java
   classes where applicable
   15/12/17 19:45:00 WARN MetricsSystem: Using default name
   DAGScheduler for source because spark.app.id is not set.
   Traceback (most recent call last):
      File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py",
   line 80, in <module>
        lines = KafkaUtils.createDirectStream(ssc, ["test"],
   {"metadata.broker.list":"localhost:1281"})
      File
   
"/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
   line 130, in createDirectStream
   py4j.protocol.Py4JJavaError: An error occurred while calling
   o29.createDirectStream.
   : java.lang.NoSuchMethodError:
   scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
        at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
        at
   kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
        at
   
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
        at
   
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
        at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
        at
   
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at
   
org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
   sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at
   py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at
   py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

Am I missing something?

Thanks in advance
Chris M.





Reply via email to