[ 
https://issues.apache.org/jira/browse/KAFKA-725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223457#comment-15223457
 ] 

ASF GitHub Bot commented on KAFKA-725:
--------------------------------------

GitHub user srdo reopened a pull request:

    https://github.com/apache/kafka/pull/1178

    KAFKA-725: Change behavior of Log/LogSegment when attempting read on an 
offset that's above high watermark.

    This should make Log.read act the same when startOffset is larger than 
maxOffset as it would if startOffset was larger than logEndOffset. The current 
behavior can result in an IllegalArgumentException from LogSegment if a 
consumer attempts to fetch an offset above the high watermark which is present 
in the leader's log. It seems more correct if Log.read presents the view of the 
log to consumers as if it simply ended at maxOffset (high watermark).
    
    I've tried to describe an example scenario of this happening here 
https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673
    
    I'm not sure I understand why ReplicaManager sets maxOffset to the high 
watermark, and not high watermark + 1. Isn't the high watermark the last 
committed message, and readable by consumers?
    
    Tests passed for me locally on second try, seems like it just hit a flaky 
test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/kafka KAFKA-725

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1178
    
----
commit c7bab99b77b71c73380d473facda1138799e42a6
Author: Stig Rohde Døssing <s...@it-minds.dk>
Date:   2016-04-02T12:20:50Z

    KAFKA-725: Throw OffsetOutOfRangeException when reading from Log with 
maxOffset > startOffset

commit 4f5b415651ec45d3040c22393d24293de4f2cfd0
Author: Stig Rohde Døssing <s...@it-minds.dk>
Date:   2016-04-02T23:29:02Z

    KAFKA-725: Put check for HW from consumer in 
ReplicaManager.readFromLocalLog instead of Log.read

----


> Broker Exception: Attempt to read with a maximum offset less than start offset
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-725
>                 URL: https://issues.apache.org/jira/browse/KAFKA-725
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>            Assignee: Jay Kreps
>
> I have a simple consumer that's reading from a single topic/partition pair. 
> Running it seems to trigger these messages on the broker periodically:
> 2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] 
> []  [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (7951715) less than the start offset (7951732).
>         at kafka.log.LogSegment.read(LogSegment.scala:105)
>         at kafka.log.Log.read(Log.scala:390)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.Map$Map1.map(Map.scala:93)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
>         at 
> kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at 
> kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
>         at 
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
>         at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:619)
> When I shut the consumer down, I don't see the exceptions anymore.
> This is the code that my consumer is running:
>           while(true) {
>             // we believe the consumer to be connected, so try and use it for 
> a fetch request
>             val request = new FetchRequestBuilder()
>               .addFetch(topic, partition, nextOffset, fetchSize)
>               .maxWait(Int.MaxValue)
>               // TODO for super high-throughput, might be worth waiting for 
> more bytes
>               .minBytes(1)
>               .build
>             debug("Fetching messages for stream %s and offset %s." format 
> (streamPartition, nextOffset))
>             val messages = connectedConsumer.fetch(request)
>             debug("Fetch complete for stream %s and offset %s. Got messages: 
> %s" format (streamPartition, nextOffset, messages))
>             if (messages.hasError) {
>               warn("Got error code from broker for %s: %s. Shutting down 
> consumer to trigger a reconnect." format (streamPartition, 
> messages.errorCode(topic, partition)))
>               ErrorMapping.maybeThrowException(messages.errorCode(topic, 
> partition))
>             }
>             messages.messageSet(topic, partition).foreach(msg => {
>               watchers.foreach(_.onMessagesReady(msg.offset.toString, 
> msg.message.payload))
>               nextOffset = msg.nextOffset
>             })
>           }
> Any idea what might be causing this error?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to