rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103221020


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
-                    val logAppendInfoOpt = 
processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) 
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - 
nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
-                      if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no 
exception during processPartitionData
-                        val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = 
Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { 
divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new 
EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && 
FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log 
of the replica
+                      // but we don't process the partition data in order to 
not update the
+                      // low/high watermarks until the truncation is actually 
done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new 
EpochEndOffset()

Review Comment:
   The method is called in line 457/458 -> 
truncateOnFetchResponse(divergingEndOffsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to