Chris Riccomini created KAFKA-725: ------------------------------------- Summary: Broker Exception: Attempt to read with a maximum offset less than start offset Key: KAFKA-725 URL: https://issues.apache.org/jira/browse/KAFKA-725 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8 Reporter: Chris Riccomini Assignee: Jay Kreps
I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically: 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152) java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732). at kafka.log.LogSegment.read(LogSegment.scala:105) at kafka.log.Log.read(Log.scala:390) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165) at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186) at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:58) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) When I shut the consumer down, I don't see the exceptions anymore. This is the code that my consumer is running: while(true) { // we believe the consumer to be connected, so try and use it for a fetch request val request = new FetchRequestBuilder() .addFetch(topic, partition, nextOffset, fetchSize) .maxWait(Int.MaxValue) // TODO for super high-throughput, might be worth waiting for more bytes .minBytes(1) .build debug("Fetching messages for stream %s and offset %s." format (streamPartition, nextOffset)) val messages = connectedConsumer.fetch(request) debug("Fetch complete for stream %s and offset %s. Got messages: %s" format (streamPartition, nextOffset, messages)) if (messages.hasError) { warn("Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect." format (streamPartition, messages.errorCode(topic, partition))) ErrorMapping.maybeThrowException(messages.errorCode(topic, partition)) } messages.messageSet(topic, partition).foreach(msg => { watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload)) nextOffset = msg.nextOffset }) } Any idea what might be causing this error? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira