[ https://issues.apache.org/jira/browse/KAFKA-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463400#comment-16463400 ]
ASF GitHub Bot commented on KAFKA-6857: --------------------------------------- hachikuji closed pull request #4967: KAFKA-6857: Leader must always reply with undefined offset if undefined leader epoch requested URL: https://github.com/apache/kafka/pull/4967 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index eab0b9c378c..220432d32c0 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -96,10 +96,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM override def endOffsetFor(requestedEpoch: Int): Long = { inReadLock(lock) { val offset = - if (requestedEpoch == latestEpoch) { + if (requestedEpoch == UNDEFINED_EPOCH) { + // this may happen if a bootstrapping follower sends a request with undefined epoch or + // a follower is on the older message format where leader epochs are not recorded + UNDEFINED_EPOCH_OFFSET + } else if (requestedEpoch == latestEpoch) { leo().messageOffset - } - else { + } else { val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch) if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) UNDEFINED_EPOCH_OFFSET diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 8460fe4c54c..4a8df11f8a3 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -143,6 +143,21 @@ class LeaderEpochFileCacheTest { assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0)) } + @Test + def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){ + val leo = 73 + def leoFinder() = new LogOffsetMetadata(leo) + + //Given + val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + + //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH + val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) + + //Then + assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor) + } + @Test def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){ def leoFinder() = new LogOffsetMetadata(0) @@ -664,4 +679,4 @@ class LeaderEpochFileCacheTest { def setUp() { checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile()) } -} \ No newline at end of file +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > LeaderEpochFileCache.endOffsetFor() should check for UNDEFINED_EPOCH > explicitly > ------------------------------------------------------------------------------- > > Key: KAFKA-6857 > URL: https://issues.apache.org/jira/browse/KAFKA-6857 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.11.0.0 > Reporter: Jun Rao > Assignee: Anna Povzner > Priority: Major > > In LeaderEpochFileCache.endOffsetFor() , we have the following code. > > > {code:java} > if (requestedEpoch == latestEpoch) { > leo().messageOffset > {code} > > In the case when the requestedEpoch is UNDEFINED_EPOCH and latestEpoch is > also UNDEFINED_EPOCH, we return leo. This will cause the follower to truncate > to a wrong offset. If requestedEpoch is UNDEFINED_EPOCH, we need to request > UNDEFINED_EPOCH_OFFSET. -- This message was sent by Atlassian JIRA (v7.6.3#76005)