The current setting is to commit to ZK every 100 messages read. The read buffer size is 262144 bytes. So we will read in a bunch of messages in a batch. And while iterating through those messages, we commit the offset to ZK every 100.
jim On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Jim, > > When messages gets cleaned based on data retention policy (by time or by > size), the brokers will not inform ZK for the deletion event. The > underlying assumption is that when consumers are fetching data at around > the tail of the log (i.e. they are not much lagging, which is normal cases) > they should be continuously update the consumed offsets in ZK and hence > that offsets will be valid most of the time. When consumers are lagging > behind and the old messages are cleaned they will get this exception, and > consumers need to handle it by resetting their offset to, e.g. the head of > the log. > > How frequent do your clients read / write the offsets in ZK? > > Guozhang > > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote: > > > Hello, > > > > I understand what this error means, just not sure why I keep running > into > > it after 24-48 hrs of running fine consuming > 300 messages / second. > > > > What happens when a kafka log rolls over and some old records are aged > > out? I mean what happens to the offsets? We are using a python client > which > > stores the offsets in ZK. But in the middle of the run, say after 2 days > or > > so, suddenly it gets this error. > > > > The only possibility is that the older records have aged off and ZK still > > has the offset which is no longer applicable...How does the java client > > deal with this? Does kafka inform ZK that records have been aged off and > > update the offset or something? > > > > Here is the error i see in the broker logs > > > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch > > request for partition [activity.stream,3] offset 8013827 from consumer > > with > > correlation id 73 (kafka.server.KafkaApis) > > > > kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but > we > > only have log segments in the range 8603331 to 11279773. > > > > at kafka.log.Log.read(Log.scala:380) > > > > at > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > > > > at > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > > > > at > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > > > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > > at scala.collection.immutable.Map$Map3.foreach(Map.scala:164) > > > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > > > at scala.collection.immutable.Map$Map3.map(Map.scala:144) > > > > at > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > > > > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) > > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:186) > > > > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > thx > > > > Jim > > > > > > -- > -- Guozhang >