OK so this was Kafka issue?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 16:55, Dominik Safaric <dominiksafa...@gmail.com> wrote:

> Dear all,
>
> I managed to resolve the issue. Since I kept getting the exception
> "org.apache.spark.SparkException: java.nio.channels.ClosedChannelException”,
>
> a reasonable direction was checking the advertised.host.name key which as
> I’ve read from the docs basically sets for the broker the host.name it
> should advertise to the consumers and producers.
>
> By setting this property, I instantly started receiving Kafka log messages.
>
> Nevertheless, thank you all for your help, I appreciate it!
>
> On 07 Jun 2016, at 17:44, Dominik Safaric <dominiksafa...@gmail.com>
> wrote:
>
> Dear Todd,
>
> By running bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic
> <topic_name> --broker-list localhost:9092 --time -1
>
> I get the following current offset for <topic_name> <topic_name>:0:1760
>
> But I guess this does not provide as much information.
>
> To answer your other question, onto how exactly do I track the offset -
> implicitly via Spark Streaming, i.e. using the default checkpoints.
>
> On 07 Jun 2016, at 15:46, Todd Nist <tsind...@gmail.com> wrote:
>
> Hi Dominik,
>
> Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall.  However, it
> appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x
> broker, but not the other way around; sorry for the confusion there.
>
> With the direct stream, simple consumer, offsets are tracked by Spark
> Streaming within its checkpoints by default.  You can also manage them
> yourself if desired.  How are you dealing with offsets ?
>
> Can you verify the offsets on the broker:
>
> kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC>
> --broker-list <BROKER-IP:PORT> --time -1
>
> -Todd
>
> On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <dominiksafa...@gmail.com>
> wrote:
>
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % 
>> "1.6.1"
>>
>> Please take a look at the SBT copy.
>>
>> I would rather think that the problem is related to the Zookeeper/Kafka
>> consumers.
>>
>> [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in
>> config, running  in standalone mode
>> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
>>
>> Any indication onto why the channel connection might be closed? Would it
>> be Kafka or Zookeeper related?
>>
>> On 07 Jun 2016, at 14:07, Todd Nist <tsind...@gmail.com> wrote:
>>
>> What version of Spark are you using?  I do not believe that 1.6.x is
>> compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2
>> and 0.9.0.x.  See this for more information:
>>
>> https://issues.apache.org/jira/browse/SPARK-12177
>>
>> -Todd
>>
>> On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafa...@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> Correct, I am using the 0.9.0.1 version.
>>>
>>> As already described, the topic contains messages. Those messages are
>>> produced using the Confluence REST API.
>>>
>>> However, what I’ve observed is that the problem is not in the Spark
>>> configuration, but rather Zookeeper or Kafka related.
>>>
>>> Take a look at the exception’s stack top item:
>>>
>>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([<topicname>,0])
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>> at scala.util.Either.fold(Either.scala:97)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>> at org.mediasoft.spark.Driver$.main(Driver.scala:22)
>>> at .<init>(<console>:11)
>>> at .<clinit>(<console>)
>>> at .<init>(<console>:7)
>>>
>>> By listing all active connections using netstat, I’ve also observed that
>>> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka
>>> 9092.
>>>
>>> Furthermore, I am also able to retrieve all log messages using the
>>> console consumer.
>>>
>>> Any clue what might be going wrong?
>>>
>>> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>> Hi,
>>>
>>> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you?
>>> What's the topic name?
>>>
>>> Jacek
>>> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafa...@gmail.com>
>>> wrote:
>>>
>>>> As I am trying to integrate Kafka into Spark, the following exception
>>>> occurs:
>>>>
>>>> org.apache.spark.SparkException:
>>>> java.nio.channels.ClosedChannelException
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([*<topicName>*,0])
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>>>         at scala.util.Either.fold(Either.scala:97)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>>>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>>>>         at .<init>(<console>:11)
>>>>         at .<clinit>(<console>)
>>>>         at .<init>(<console>:7)
>>>>         at .<clinit>(<console>)
>>>>         at $print(<console>)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>>>>         at
>>>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>>>>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>>>>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>>>>         at
>>>> scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>>>>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>>>>         at
>>>>
>>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>>>>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>>>>         at
>>>>
>>>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:483)
>>>>         at
>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>>
>>>> As for the Spark configuration:
>>>>
>>>>    val conf: SparkConf = new
>>>> SparkConf().setAppName("AppName").setMaster("local[2]")
>>>>
>>>>     val confParams: Map[String, String] = Map(
>>>>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>>>>       "auto.offset.reset" -> "largest"
>>>>     )
>>>>
>>>>     val topics: Set[String] = Set("<topic_name>")
>>>>
>>>>     val context: StreamingContext = new StreamingContext(conf,
>>>> Seconds(1))
>>>>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
>>>> topics)
>>>>
>>>>     kafkaStream.foreachRDD(rdd => {
>>>>       rdd.collect().foreach(println)
>>>>     })
>>>>
>>>>     context.awaitTermination()
>>>>     context.start()
>>>>
>>>> The Kafka topic does exist, Kafka server is up and running and I am
>>>> able to
>>>> produce messages to that particular topic using the Confluent REST API.
>>>>
>>>> What might the problem actually be?
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> <http://nabble.com/>.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>>
>
>
>

Reply via email to