Unless you built your own Spark distribution with Scala 2_11, you want to use the 2.10 dependency :
--packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas <cman...@cslab.ece.ntua.gr > wrote: > Hello, > > I am trying to set up a simple example with Spark Streaming (Python) and > Kafka on a single machine deployment. > My Kafka broker/server is also on the same machine (localhost:1281) and I > am using Spark Version: spark-1.5.2-bin-hadoop2.6 > > Python code > > ... > ssc = StreamingContext(sc, 1) > ... > lines = KafkaUtils.createDirectStream(ssc, ["test"], > {"metadata.broker.list":"localhost:1281"}) > > > So I try > > spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar > my_kafka_streaming_wordcount.py > > OR > > spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.5.2 > my_kafka_streaming_wordcount.py > (my kafka version is 2.11-0.9.0.0) > > OR > > pyspark --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar .... [import > stuff and type those lines] > > > and I end up with: > > 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for > source because spark.app.id is not set. > Traceback (most recent call last): > File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80, > in <module> > lines = KafkaUtils.createDirectStream(ssc, ["test"], > {"metadata.broker.list":"localhost:1281"}) > File > "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py", > line 130, in createDirectStream > py4j.protocol.Py4JJavaError: An error occurred while calling > o29.createDirectStream. > : java.lang.NoSuchMethodError: > scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; > at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39) > at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala) > at kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53) > at > org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122) > at > org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) > at > org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > > Am I missing something? > > Thanks in advance > Chris M. > > > > > > -- Luciano Resende http://people.apache.org/~lresende http://twitter.com/lresende1975 http://lresende.blogspot.com/