[ https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235016#comment-15235016 ]
Stig Rohde Døssing commented on KAFKA-725: ------------------------------------------ [~guozhang] Okay, returning the error should be fine then. I can't really think of a case where this error can happen if the client is well behaved and unclean leader election is turned off. If the client never increments its offset by more than 1 past the most recently consumed message, it shouldn't be possible for it to request an offset higher than the high watermark, since the most recent offset it can have consumed is HW - 1. > Broker Exception: Attempt to read with a maximum offset less than start offset > ------------------------------------------------------------------------------ > > Key: KAFKA-725 > URL: https://issues.apache.org/jira/browse/KAFKA-725 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8.0 > Reporter: Chris Riccomini > Assignee: Stig Rohde Døssing > Fix For: 0.10.0.0 > > > I have a simple consumer that's reading from a single topic/partition pair. > Running it seems to trigger these messages on the broker periodically: > 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] > [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152) > java.lang.IllegalArgumentException: Attempt to read with a maximum offset > (7951715) less than the start offset (7951732). > at kafka.log.LogSegment.read(LogSegment.scala:105) > at kafka.log.Log.read(Log.scala:390) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) > at > kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165) > at > kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164) > at > kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186) > at > kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185) > at kafka.server.KafkaApis.handle(KafkaApis.scala:58) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) > at java.lang.Thread.run(Thread.java:619) > When I shut the consumer down, I don't see the exceptions anymore. > This is the code that my consumer is running: > while(true) { > // we believe the consumer to be connected, so try and use it for > a fetch request > val request = new FetchRequestBuilder() > .addFetch(topic, partition, nextOffset, fetchSize) > .maxWait(Int.MaxValue) > // TODO for super high-throughput, might be worth waiting for > more bytes > .minBytes(1) > .build > debug("Fetching messages for stream %s and offset %s." format > (streamPartition, nextOffset)) > val messages = connectedConsumer.fetch(request) > debug("Fetch complete for stream %s and offset %s. Got messages: > %s" format (streamPartition, nextOffset, messages)) > if (messages.hasError) { > warn("Got error code from broker for %s: %s. Shutting down > consumer to trigger a reconnect." format (streamPartition, > messages.errorCode(topic, partition))) > ErrorMapping.maybeThrowException(messages.errorCode(topic, > partition)) > } > messages.messageSet(topic, partition).foreach(msg => { > watchers.foreach(_.onMessagesReady(msg.offset.toString, > msg.message.payload)) > nextOffset = msg.nextOffset > }) > } > Any idea what might be causing this error? -- This message was sent by Atlassian JIRA (v6.3.4#6332)