When would you read offsets from ZK, only when starting up? Also what is your data retention config values on the broker?
Guozhang On Fri, Nov 7, 2014 at 10:30 AM, Jimmy John <jj...@livefyre.com> wrote: > The current setting is to commit to ZK every 100 messages read. > > The read buffer size is 262144 bytes. So we will read in a bunch of > messages in a batch. And while iterating through those messages, we commit > the offset to ZK every 100. > > jim > > On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Jim, > > > > When messages gets cleaned based on data retention policy (by time or by > > size), the brokers will not inform ZK for the deletion event. The > > underlying assumption is that when consumers are fetching data at around > > the tail of the log (i.e. they are not much lagging, which is normal > cases) > > they should be continuously update the consumed offsets in ZK and hence > > that offsets will be valid most of the time. When consumers are lagging > > behind and the old messages are cleaned they will get this exception, and > > consumers need to handle it by resetting their offset to, e.g. the head > of > > the log. > > > > How frequent do your clients read / write the offsets in ZK? > > > > Guozhang > > > > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John <jj...@livefyre.com> wrote: > > > > > Hello, > > > > > > I understand what this error means, just not sure why I keep running > > into > > > it after 24-48 hrs of running fine consuming > 300 messages / second. > > > > > > What happens when a kafka log rolls over and some old records are > aged > > > out? I mean what happens to the offsets? We are using a python client > > which > > > stores the offsets in ZK. But in the middle of the run, say after 2 > days > > or > > > so, suddenly it gets this error. > > > > > > The only possibility is that the older records have aged off and ZK > still > > > has the offset which is no longer applicable...How does the java client > > > deal with this? Does kafka inform ZK that records have been aged off > and > > > update the offset or something? > > > > > > Here is the error i see in the broker logs > > > > > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing > fetch > > > request for partition [activity.stream,3] offset 8013827 from consumer > > > with > > > correlation id 73 (kafka.server.KafkaApis) > > > > > > kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but > > we > > > only have log segments in the range 8603331 to 11279773. > > > > > > at kafka.log.Log.read(Log.scala:380) > > > > > > at > > > > > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > > > > > > at > > > > > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > > > > > > at > > > > > > > > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > > > > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > > > > at scala.collection.immutable.Map$Map3.foreach(Map.scala:164) > > > > > > at > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > > > > > at scala.collection.immutable.Map$Map3.map(Map.scala:144) > > > > > > at > > > > > > > > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) > > > > > > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437) > > > > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:186) > > > > > > at > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > thx > > > > > > Jim > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang