Ok. Problem is resolved when I increased retention policy for topic. But now I see that whenever I restart Spark job, some old messages are being pulled up by Spark stream. For new Spark stream API, do we need to keep track of offsets?
LCassa On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke <ghe...@cloudera.com> wrote: > Looks like this is likely a case very similar to the case Parth mentioned > storm users have seen, when processing falls behind the retention period. > > Perhaps Spark and Kafka can handle this scenario more gracefully. I would > be happy to do some investigation/testing and report back with findings and > potentially open a Jira to track any fix. > > On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt < > pbrahmbh...@hortonworks.com> wrote: > > > retention.ms is actually millisecond, you want a value much larger then > > 1440, which translates to 1.4 seconds. > > > > > > On 8/6/15, 4:35 PM, "Cassa L" <lcas...@gmail.com> wrote: > > > > >Hi Grant, > > >Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this > > >exception: > > >kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we > > >only > > >have log segments in the range 2824 to 2824. > > > at kafka.log.Log.read(Log.scala:380) > > > at > > > >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc > > >ala:530) > > > at > > > >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. > > >apply(KafkaApis.scala:476) > > > at > > > >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. > > >apply(KafkaApis.scala:471) > > > at > > > >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal > > >a:206) > > > at > > > >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal > > >a:206) > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > > > at > > >scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > > > at > > > >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s > > >cala:471) > > > at > > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) > > > at > > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) > > > at > > > >kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc > > >a > > > > > >Similar kind of exception comes to Spark Job. > > > > > >Here are my versions : > > > Spark - 1.4.1 > > > Kafka - 0.8.1 > > > > > >I changed retention on config using this command : > > >./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic > --config > > >retention.ms=1440 (I believe this is in minutes) > > > > > >I am also noticing something in Kafka. When I run below command on > broker: > > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2 > > >Earliest offset is being set to latest just in few seconds. Am I > > >co-relating this issue correctly? > > > > > >Here is my example on a new Topic. Initial output of this command is > > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2 > > >SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > >SLF4J: Defaulting to no-operation (NOP) logger implementation > > >SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > further > > >details. > > >MyTopic:0:60 > > > > > >I published 4 messages to Kafka. Immediately after few seconds, command > > >output is: > > > MyTopic:0:64 > > >Isn't this supposed to stay at 60 for longer time based on retention > > >policy? > > > > > > > > >Thanks, > > >Leena > > > > > > > > >On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke <ghe...@cloudera.com> > wrote: > > > > > >> Does this Spark Jira match up with what you are seeing or sound > related? > > >> https://issues.apache.org/jira/browse/SPARK-8474 > > >> > > >> What versions of Spark and Kafka are you using? Can you include more > of > > >>the > > >> spark log? Any errors shown in the Kafka log? > > >> > > >> Thanks, > > >> Grant > > >> > > >> On Thu, Aug 6, 2015 at 1:17 PM, Cassa L <lcas...@gmail.com> wrote: > > >> > > >> > Hi, > > >> > Has anyone tried streaming API of Spark with Kafka? I am > > >>experimenting > > >> new > > >> > Spark API to read from Kafka. > > >> > KafkaUtils.createDirectStream(...) > > >> > > > >> > Every now and then, I get following error "spark > > >> > kafka.common.OffsetOutOfRangeException" and my spark script stops > > >> working. > > >> > I have simple topic with just one partition. > > >> > > > >> > I would appreciate any clues on how to debug this issue. > > >> > > > >> > Thanks, > > >> > LCassa > > >> > > > >> > > >> > > >> > > >> -- > > >> Grant Henke > > >> Software Engineer | Cloudera > > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > > >> > > > > > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >