[ 
https://issues.apache.org/jira/browse/KAFKA-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463400#comment-16463400
 ] 

ASF GitHub Bot commented on KAFKA-6857:
---------------------------------------

hachikuji closed pull request #4967: KAFKA-6857: Leader must always reply with 
undefined offset if undefined leader epoch requested
URL: https://github.com/apache/kafka/pull/4967
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index eab0b9c378c..220432d32c0 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -96,10 +96,13 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, 
leo: () => LogOffsetM
   override def endOffsetFor(requestedEpoch: Int): Long = {
     inReadLock(lock) {
       val offset =
-        if (requestedEpoch == latestEpoch) {
+        if (requestedEpoch == UNDEFINED_EPOCH) {
+          // this may happen if a bootstrapping follower sends a request with 
undefined epoch or
+          // a follower is on the older message format where leader epochs are 
not recorded
+          UNDEFINED_EPOCH_OFFSET
+        } else if (requestedEpoch == latestEpoch) {
           leo().messageOffset
-        }
-        else {
+        } else {
           val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch)
           if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
             UNDEFINED_EPOCH_OFFSET
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 8460fe4c54c..4a8df11f8a3 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -143,6 +143,21 @@ class LeaderEpochFileCacheTest {
     assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
   }
 
+  @Test
+  def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
+    val leo = 73
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When (say a follower on older message format version) sends request for 
UNDEFINED_EPOCH
+    val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+  }
+
   @Test
   def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
     def leoFinder() = new LogOffsetMetadata(0)
@@ -664,4 +679,4 @@ class LeaderEpochFileCacheTest {
   def setUp() {
     checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
   }
-}
\ No newline at end of file
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LeaderEpochFileCache.endOffsetFor() should check for UNDEFINED_EPOCH 
> explicitly
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6857
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.11.0.0
>            Reporter: Jun Rao
>            Assignee: Anna Povzner
>            Priority: Major
>
> In LeaderEpochFileCache.endOffsetFor() , we have the following code.
>  
>  
> {code:java}
> if (requestedEpoch == latestEpoch) {
>  leo().messageOffset
> {code}
>  
> In the case when the requestedEpoch is UNDEFINED_EPOCH and latestEpoch is 
> also UNDEFINED_EPOCH, we return leo. This will cause the follower to truncate 
> to a wrong offset. If requestedEpoch is UNDEFINED_EPOCH, we need to request 
> UNDEFINED_EPOCH_OFFSET.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to