Chris Riccomini created KAFKA-725:
-------------------------------------

             Summary: Broker Exception: Attempt to read with a maximum offset 
less than start offset
                 Key: KAFKA-725
                 URL: https://issues.apache.org/jira/browse/KAFKA-725
             Project: Kafka
          Issue Type: Bug
          Components: log
    Affects Versions: 0.8
            Reporter: Chris Riccomini
            Assignee: Jay Kreps


I have a simple consumer that's reading from a single topic/partition pair. 
Running it seems to trigger these messages on the broker periodically:

2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] []  
[KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
(7951715) less than the start offset (7951732).
        at kafka.log.LogSegment.read(LogSegment.scala:105)
        at kafka.log.Log.read(Log.scala:390)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
        at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
        at 
kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
        at 
kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at 
kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
        at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
        at 
kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
        at java.lang.Thread.run(Thread.java:619)

When I shut the consumer down, I don't see the exceptions anymore.

This is the code that my consumer is running:
          while(true) {
            // we believe the consumer to be connected, so try and use it for a 
fetch request
            val request = new FetchRequestBuilder()
              .addFetch(topic, partition, nextOffset, fetchSize)
              .maxWait(Int.MaxValue)
              // TODO for super high-throughput, might be worth waiting for 
more bytes
              .minBytes(1)
              .build

            debug("Fetching messages for stream %s and offset %s." format 
(streamPartition, nextOffset))
            val messages = connectedConsumer.fetch(request)
            debug("Fetch complete for stream %s and offset %s. Got messages: 
%s" format (streamPartition, nextOffset, messages))
            if (messages.hasError) {
              warn("Got error code from broker for %s: %s. Shutting down 
consumer to trigger a reconnect." format (streamPartition, 
messages.errorCode(topic, partition)))
              ErrorMapping.maybeThrowException(messages.errorCode(topic, 
partition))
            }
            messages.messageSet(topic, partition).foreach(msg => {
              watchers.foreach(_.onMessagesReady(msg.offset.toString, 
msg.message.payload))
              nextOffset = msg.nextOffset
            })
          }

Any idea what might be causing this error?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to