Could you check the Scala version of your Kafka?

Best Regards,
Shixiong Zhu

2015-12-18 2:31 GMT-08:00 Christos Mantas <cman...@cslab.ece.ntua.gr>:

> Thank you, Luciano, Shixiong.
>
> I thought the "_2.11" part referred to the Kafka version - an unfortunate
> coincidence.
>
> Indeed
>
> spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar
> my_kafka_streaming_wordcount.py
> OR
> spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2
> my_kafka_streaming_wordcount.py
>
> Worked, however, it needed another amendment:
>
> lines = KafkaUtils.createStream(ssc, "localhost:2181", "consumer-group",
> {"test": 1})
>
> creates the D-stream with no error, but
>
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
> produces a
>
> Py4JJavaError: An error occurred while calling o29.createDirectStream.
> : org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>     at scala.util.Either.fold(Either.scala:97)
>     at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>     at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>     at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
> ....
> *more lines here .... *
>
> I am mentioning it, in case anyone else lands in the list archives trying
> to run a simple PySpark Streaming, Kafka Hello World.
>
> Best Regards
> Chris M.
>
>
>
> On 12/18/2015 04:20 AM, Luciano Resende wrote:
>
> Unless you built your own Spark distribution with Scala 2_11, you want to
> use the 2.10 dependency :
>
>    --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2
>
> On Thu, Dec 17, 2015 at 10:10 AM, Christos Mantas <
> cman...@cslab.ece.ntua.gr> wrote:
>
>> Hello,
>>
>> I am trying to set up a simple example with Spark Streaming (Python) and
>> Kafka on a single machine deployment.
>> My Kafka broker/server is also on the same machine (localhost:1281) and I
>> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>>
>> Python code
>>
>> ...
>> ssc = StreamingContext(sc, 1)
>> ...
>> lines = KafkaUtils.createDirectStream(ssc, ["test"],
>> {"metadata.broker.list":"localhost:1281"})
>>
>>
>> So I try
>>
>> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
>> my_kafka_streaming_wordcount.py
>>
>> OR
>>
>> spark-submit --packages
>> org.apache.spark:spark-streaming-kafka_2.11:1.5.2
>> my_kafka_streaming_wordcount.py
>> (my kafka version is 2.11-0.9.0.0)
>>
>> OR
>>
>> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar ....
>> [import stuff and type those lines]
>>
>>
>> and I end up with:
>>
>> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> Traceback (most recent call last):
>>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line
>> 80, in <module>
>>     lines = KafkaUtils.createDirectStream(ssc, ["test"],
>> {"metadata.broker.list":"localhost:1281"})
>>   File
>> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
>> line 130, in createDirectStream
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o29.createDirectStream.
>> : java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>     at kafka.api.RequestKeys$.<init>(RequestKeys.scala:39)
>>     at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala)
>>     at
>> kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:53)
>>     at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
>>     at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>     at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
>>     at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>     at
>> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>     at py4j.Gateway.invoke(Gateway.java:259)
>>     at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>     at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>     at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> Am I missing something?
>>
>> Thanks in advance
>> Chris M.
>>
>>
>>
>>
>>
>>
>
>
> --
> Luciano Resende
> http://people.apache.org/~lresende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>
>
>

Reply via email to