Could you check the Scala version of your Kafka? Best Regards, Shixiong Zhu
2015-12-18 2:31 GMT-08:00 Christos Mantas <cman...@cslab.ece.ntua.gr>: > Thank you, Luciano, Shixiong. > > I thought the "_2.11" part referred to the Kafka version - an unfortunate > coincidence. > > Indeed > > spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar > my_kafka_streaming_wordcount.py > OR > spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 > my_kafka_streaming_wordcount.py > > Worked, however, it needed another amendment: > > lines = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group", > {"test": 1}) > > creates the D-stream with no error, but > > lines = KafkaUtils.createDirectStream(ssc, ["test"], > {"metadata.broker.list":"localhost:1281"}) > > produces a > > Py4JJavaError: An error occurred while calling o29.createDirectStream. > : org.apache.spark.SparkException: java.nio.channels.ClosedChannelException > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at scala.util.Either.fold(Either.scala:97) > at > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) > at > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) > at > org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614) > .... > *more lines here .... * > > I am mentioning it, in case anyone else lands in the list archives trying > to run a simple PySpark Streaming, Kafka Hello World. > > Best Regards > Chris M. > > > > On 12/18/2015 04:20 AM, Luciano Resende wrote: > > Unless you built your own Spark distribution with Scala 2_11, you want to > use the 2.10 dependency : > > --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 > > On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas < > cman...@cslab.ece.ntua.gr> wrote: > >> Hello, >> >> I am trying to set up a simple example with Spark Streaming (Python) and >> Kafka on a single machine deployment. >> My Kafka broker/server is also on the same machine (localhost:1281) and I >> am using Spark Version: spark-1.5.2-bin-hadoop2.6 >> >> Python code >> >> ... >> ssc = StreamingContext(sc, 1) >> ... >> lines = KafkaUtils.createDirectStream(ssc, ["test"], >> {"metadata.broker.list":"localhost:1281"}) >> >> >> So I try >> >> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar >> my_kafka_streaming_wordcount.py >> >> OR >> >> spark-submit --packages >> org.apache.spark:spark-streaming-kafka_2.11:1.5.2 >> my_kafka_streaming_wordcount.py >> (my kafka version is 2.11-0.9.0.0) >> >> OR >> >> pyspark --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar .... >> [import stuff and type those lines] >> >> >> and I end up with: >> >> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for >> source because spark.app.id is not set. >> Traceback (most recent call last): >> File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line >> 80, in <module> >> lines = KafkaUtils.createDirectStream(ssc, ["test"], >> {"metadata.broker.list":"localhost:1281"}) >> File >> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py", >> line 130, in createDirectStream >> py4j.protocol.Py4JJavaError: An error occurred while calling >> o29.createDirectStream. >> : java.lang.NoSuchMethodError: >> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; >> at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39) >> at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala) >> at >> kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122) >> at >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) >> at >> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) >> at py4j.Gateway.invoke(Gateway.java:259) >> at >> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:207) >> at java.lang.Thread.run(Thread.java:745) >> >> Am I missing something? >> >> Thanks in advance >> Chris M. >> >> >> >> >> >> > > > -- > Luciano Resende > http://people.apache.org/~lresende > http://twitter.com/lresende1975 > http://lresende.blogspot.com/ > > >