hachikuji commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103436599


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
-                    val logAppendInfoOpt = 
processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) 
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - 
nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
-                      if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no 
exception during processPartitionData
-                        val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = 
Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { 
divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new 
EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && 
FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log 
of the replica
+                      // but we don't process the partition data in order to 
not update the
+                      // low/high watermarks until the truncation is actually 
done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new 
EpochEndOffset()

Review Comment:
   I was wondering about that too. We seem to batch together the truncating 
partitions, but I'm not sure there is any benefit.  That said, it seems a 
little safer not to assume that `processPartitionData` will still be valid in 
the presence of divergence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to