chia7712 commented on code in PR #19223:
URL: https://github.com/apache/kafka/pull/19223#discussion_r2006206448


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -333,11 +333,14 @@ abstract class AbstractFetcherThread(name: String,
         responseData.foreachEntry { (topicPartition, partitionData) =>
           Option(partitionStates.stateValue(topicPartition)).foreach { 
currentFetchState =>
             // It's possible that a partition is removed and re-added or 
truncated when there is a pending fetch request.
-            // In this case, we only want to process the fetch response if the 
partition state is ready for fetch and
-            // the current offset is the same as the offset requested.
+            // In this case, we only want to process the fetch response if:
+            // - the partition state is ready for fetch
+            // - the current offset is the same as the offset requested
+            // - the current leader epoch is the same as the leader epoch 
requested
             val fetchPartitionData = sessionPartitions.get(topicPartition)
             if (fetchPartitionData != null &&
                 fetchPartitionData.fetchOffset == 
currentFetchState.fetchOffset &&
+                fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == 
currentFetchState.currentLeaderEpoch).orElse(true) &&

Review Comment:
   the line#368 can be streamlined to `currentFetchState.currentLeaderEpoch` 
due to this new condition.
   ```
                         val logAppendInfoOpt = processPartitionData(
                           topicPartition,
                           currentFetchState.fetchOffset,
                           
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch),
 // this line
                           partitionData
                         )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to