When would you read offsets from ZK, only when starting up? Also what is
your data retention config values on the broker?

Guozhang

On Fri, Nov 7, 2014 at 10:30 AM, Jimmy John <jj...@livefyre.com> wrote:

> The current setting is to commit to ZK every 100 messages read.
>
> The read buffer size is 262144 bytes. So we will read in a bunch of
> messages in a batch. And while iterating through those messages, we commit
> the offset to ZK every 100.
>
> jim
>
> On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Jim,
> >
> > When messages gets cleaned based on data retention policy (by time or by
> > size), the brokers will not inform ZK for the deletion event. The
> > underlying assumption is that when consumers are fetching data at around
> > the tail of the log (i.e. they are not much lagging, which is normal
> cases)
> > they should be continuously update the consumed offsets in ZK and hence
> > that offsets will be valid most of the time. When consumers are lagging
> > behind and the old messages are cleaned they will get this exception, and
> > consumers need to handle it by resetting their offset to, e.g. the head
> of
> > the log.
> >
> > How frequent do your clients read / write the offsets in ZK?
> >
> > Guozhang
> >
> > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote:
> >
> > > Hello,
> > >
> > >   I understand what this error means, just not sure why I keep running
> > into
> > > it after 24-48 hrs of running fine consuming > 300 messages / second.
> > >
> > >   What happens when a kafka log rolls over and some old records are
> aged
> > > out? I mean what happens to the offsets? We are using a python client
> > which
> > > stores the offsets in ZK. But in the middle of the run, say after 2
> days
> > or
> > > so, suddenly it gets this error.
> > >
> > > The only possibility is that the older records have aged off and ZK
> still
> > > has the offset which is no longer applicable...How does the java client
> > > deal with this? Does kafka inform ZK that records have been aged off
> and
> > > update the offset or something?
> > >
> > > Here is the error i see in the broker logs
> > >
> > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing
> fetch
> > > request for partition [activity.stream,3] offset 8013827 from consumer
> > > with
> > > correlation id 73 (kafka.server.KafkaApis)
> > >
> > >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> > we
> > > only have log segments in the range 8603331 to 11279773.
> > >
> > >      at kafka.log.Log.read(Log.scala:380)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >
> > >      at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >      at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >      at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> > >
> > >     at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >
> > >      at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> > >
> > >      at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >
> > >      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >
> > >      at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >
> > >      at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >
> > >      at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > thx
> > >
> > > Jim
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to