retention.ms is actually millisecond, you want a value much larger then
1440, which translates to 1.4 seconds.


On 8/6/15, 4:35 PM, "Cassa L" <lcas...@gmail.com> wrote:

>Hi Grant,
>Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this
>exception:
>kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we
>only
>have log segments in the range 2824 to 2824.
>        at kafka.log.Log.read(Log.scala:380)
>        at
>kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc
>ala:530)
>        at
>kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
>apply(KafkaApis.scala:476)
>        at
>kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
>apply(KafkaApis.scala:471)
>        at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:206)
>        at
>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
>a:206)
>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>        at
>scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>        at
>kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s
>cala:471)
>        at
>kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
>        at
>kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
>        at
>kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc
>a
>
>Similar kind of exception comes to Spark Job.
>
>Here are my versions :
>       Spark - 1.4.1
>        Kafka - 0.8.1
>
>I changed retention on config using this command :
>./kafka-topics.sh --alter --zookeeper  XXX:2181  --topic MyTopic --config
>retention.ms=1440  (I believe this is in minutes)
>
>I am also noticing something in Kafka. When I run below command on broker:
>./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
>vdc-vm8.apple.com:9092 --topic MyTopic --time -2
>Earliest offset is being set to latest just in few seconds. Am I
>co-relating this issue correctly?
>
>Here is my example on a new Topic. Initial output of this command is
>./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
>vdc-vm8.apple.com:9092 --topic MyTopic --time -2
>SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>SLF4J: Defaulting to no-operation (NOP) logger implementation
>SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
>details.
>MyTopic:0:60
>
>I published 4 messages to Kafka. Immediately after few seconds, command
>output is:
>          MyTopic:0:64
>Isn't this supposed to stay at 60 for longer time based on retention
>policy?
>
>
>Thanks,
>Leena
>
>
>On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke <ghe...@cloudera.com> wrote:
>
>> Does this Spark Jira match up with what you are seeing or sound related?
>> https://issues.apache.org/jira/browse/SPARK-8474
>>
>> What versions of Spark and Kafka are you using? Can you include more of
>>the
>> spark log? Any errors shown in the Kafka log?
>>
>> Thanks,
>> Grant
>>
>> On Thu, Aug 6, 2015 at 1:17 PM, Cassa L <lcas...@gmail.com> wrote:
>>
>> > Hi,
>> >  Has anyone tried streaming API of Spark with Kafka? I am
>>experimenting
>> new
>> > Spark API to read from Kafka.
>> > KafkaUtils.createDirectStream(...)
>> >
>> > Every now and then, I get following error "spark
>> > kafka.common.OffsetOutOfRangeException" and my spark script stops
>> working.
>> > I have simple topic with just one partition.
>> >
>> > I would appreciate any clues on how to debug this issue.
>> >
>> > Thanks,
>> > LCassa
>> >
>>
>>
>>
>> --
>> Grant Henke
>> Software Engineer | Cloudera
>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>>

Reply via email to