[ https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223457#comment-15223457 ]
ASF GitHub Bot commented on KAFKA-725: -------------------------------------- GitHub user srdo reopened a pull request: https://github.com/apache/kafka/pull/1178 KAFKA-725: Change behavior of Log/LogSegment when attempting read on an offset that's above high watermark. This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark). I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673 I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers? Tests passed for me locally on second try, seems like it just hit a flaky test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/kafka KAFKA-725 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1178 ---- commit c7bab99b77b71c73380d473facda1138799e42a6 Author: Stig Rohde Døssing <s...@it-minds.dk> Date: 2016-04-02T12:20:50Z KAFKA-725: Throw OffsetOutOfRangeException when reading from Log with maxOffset > startOffset commit 4f5b415651ec45d3040c22393d24293de4f2cfd0 Author: Stig Rohde Døssing <s...@it-minds.dk> Date: 2016-04-02T23:29:02Z KAFKA-725: Put check for HW from consumer in ReplicaManager.readFromLocalLog instead of Log.read ---- > Broker Exception: Attempt to read with a maximum offset less than start offset > ------------------------------------------------------------------------------ > > Key: KAFKA-725 > URL: https://issues.apache.org/jira/browse/KAFKA-725 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.8.0 > Reporter: Chris Riccomini > Assignee: Jay Kreps > > I have a simple consumer that's reading from a single topic/partition pair. > Running it seems to trigger these messages on the broker periodically: > 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] > [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152) > java.lang.IllegalArgumentException: Attempt to read with a maximum offset > (7951715) less than the start offset (7951732). > at kafka.log.LogSegment.read(LogSegment.scala:105) > at kafka.log.Log.read(Log.scala:390) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326) > at > kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165) > at > kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164) > at > kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186) > at > kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185) > at kafka.server.KafkaApis.handle(KafkaApis.scala:58) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) > at java.lang.Thread.run(Thread.java:619) > When I shut the consumer down, I don't see the exceptions anymore. > This is the code that my consumer is running: > while(true) { > // we believe the consumer to be connected, so try and use it for > a fetch request > val request = new FetchRequestBuilder() > .addFetch(topic, partition, nextOffset, fetchSize) > .maxWait(Int.MaxValue) > // TODO for super high-throughput, might be worth waiting for > more bytes > .minBytes(1) > .build > debug("Fetching messages for stream %s and offset %s." format > (streamPartition, nextOffset)) > val messages = connectedConsumer.fetch(request) > debug("Fetch complete for stream %s and offset %s. Got messages: > %s" format (streamPartition, nextOffset, messages)) > if (messages.hasError) { > warn("Got error code from broker for %s: %s. Shutting down > consumer to trigger a reconnect." format (streamPartition, > messages.errorCode(topic, partition))) > ErrorMapping.maybeThrowException(messages.errorCode(topic, > partition)) > } > messages.messageSet(topic, partition).foreach(msg => { > watchers.foreach(_.onMessagesReady(msg.offset.toString, > msg.message.payload)) > nextOffset = msg.nextOffset > }) > } > Any idea what might be causing this error? -- This message was sent by Atlassian JIRA (v6.3.4#6332)