The current setting is to commit to ZK every 100 messages read.

The read buffer size is 262144 bytes. So we will read in a bunch of
messages in a batch. And while iterating through those messages, we commit
the offset to ZK every 100.

jim

On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Jim,
>
> When messages gets cleaned based on data retention policy (by time or by
> size), the brokers will not inform ZK for the deletion event. The
> underlying assumption is that when consumers are fetching data at around
> the tail of the log (i.e. they are not much lagging, which is normal cases)
> they should be continuously update the consumed offsets in ZK and hence
> that offsets will be valid most of the time. When consumers are lagging
> behind and the old messages are cleaned they will get this exception, and
> consumers need to handle it by resetting their offset to, e.g. the head of
> the log.
>
> How frequent do your clients read / write the offsets in ZK?
>
> Guozhang
>
> On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote:
>
> > Hello,
> >
> >   I understand what this error means, just not sure why I keep running
> into
> > it after 24-48 hrs of running fine consuming > 300 messages / second.
> >
> >   What happens when a kafka log rolls over and some old records are aged
> > out? I mean what happens to the offsets? We are using a python client
> which
> > stores the offsets in ZK. But in the middle of the run, say after 2 days
> or
> > so, suddenly it gets this error.
> >
> > The only possibility is that the older records have aged off and ZK still
> > has the offset which is no longer applicable...How does the java client
> > deal with this? Does kafka inform ZK that records have been aged off and
> > update the offset or something?
> >
> > Here is the error i see in the broker logs
> >
> > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch
> > request for partition [activity.stream,3] offset 8013827 from consumer
> > with
> > correlation id 73 (kafka.server.KafkaApis)
> >
> >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> we
> > only have log segments in the range 8603331 to 11279773.
> >
> >      at kafka.log.Log.read(Log.scala:380)
> >
> >      at
> >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> >
> >      at
> >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> >
> >      at
> >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> >
> >      at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> >      at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> >      at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> >
> >     at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >
> >      at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> >
> >      at
> >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> >
> >      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> >
> >      at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> >
> >      at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> >      at java.lang.Thread.run(Thread.java:745)
> >
> >
> > thx
> >
> > Jim
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to