Hi,

Correct, I am using the 0.9.0.1 version. 

As already described, the topic contains messages. Those messages are produced 
using the Confluence REST API.

However, what I’ve observed is that the problem is not in the Spark 
configuration, but rather Zookeeper or Kafka related. 

Take a look at the exception’s stack top item:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([<topicname>,0])
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.mediasoft.spark.Driver$.main(Driver.scala:22)
        at .<init>(<console>:11)
        at .<clinit>(<console>)
        at .<init>(<console>:7)

By listing all active connections using netstat, I’ve also observed that both 
Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 

Furthermore, I am also able to retrieve all log messages using the console 
consumer.

Any clue what might be going wrong?

> On 07 Jun 2016, at 13:13, Jacek Laskowski <[email protected]> wrote:
> 
> Hi,
> 
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
> the topic name?
> 
> Jacek
> 
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <[email protected] 
> <mailto:[email protected]>> wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at 
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at 
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
> 
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
> 
>     val topics: Set[String] = Set("<topic_name>")
> 
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
> 
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
> 
>     context.awaitTermination()
>     context.start()
> 
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
> 
> What might the problem actually be?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected] 
> <mailto:[email protected]>
> For additional commands, e-mail: [email protected] 
> <mailto:[email protected]>
> 

Reply via email to