chia7712 commented on code in PR #19223: URL: https://github.com/apache/kafka/pull/19223#discussion_r2006206448
########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -333,11 +333,14 @@ abstract class AbstractFetcherThread(name: String, responseData.foreachEntry { (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. - // In this case, we only want to process the fetch response if the partition state is ready for fetch and - // the current offset is the same as the offset requested. + // In this case, we only want to process the fetch response if: + // - the partition state is ready for fetch + // - the current offset is the same as the offset requested + // - the current leader epoch is the same as the leader epoch requested val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && + fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) && Review Comment: the line#368 can be streamlined to `currentFetchState.currentLeaderEpoch` due to this new condition. ``` val logAppendInfoOpt = processPartitionData( topicPartition, currentFetchState.fetchOffset, fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch), // this line partitionData ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org