hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
         val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
           case Some(offsetTruncationState) =>
             val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+            // Resetting `lastFetchedEpoch` since we are truncating and don't 
expect diverging epoch in the next fetch

Review comment:
       This is a little unclear to me. I guess it is safe to reset 
`lastFetchedEpoch` as long as we reinitialize it after the next leader change. 
On the other hand, it seems safer to always retain the value.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String,
     warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+    if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && 
initialFetchState.lastFetchedEpoch.nonEmpty) {
+      if (currentState == null) {
+        return PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+          state = Fetching, initialFetchState.lastFetchedEpoch)
+      }
+      // If we are in `Fetching` state can continue to fetch regardless of 
current leader epoch and truncate
+      // if necessary based on diverging epochs returned by the leader. If we 
are currently in Truncating state,
+      // fall through and handle based on current epoch.
+      if (currentState.state == Fetching) {
+        return currentState

Review comment:
       Is it not possible that the `InitialFetchState` has a bump to the 
current leader epoch? We will still need the latest epoch in order to continue 
fetching.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String,
                       // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
                       if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
                         // Update partitionStates only if there is no 
exception during processPartitionData
-                        val newFetchState = PartitionFetchState(nextOffset, 
Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
+                        val newFetchState = PartitionFetchState(nextOffset, 
Some(lag),
+                          currentFetchState.currentLeaderEpoch, state = 
Fetching,
+                          Some(currentFetchState.currentLeaderEpoch))

Review comment:
       This doesn't seem right. The last fetched epoch is supposed to represent 
the epoch of the last fetched batch. The fetcher could be fetching the data 
from an older epoch here.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
         val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
           case Some(offsetTruncationState) =>
             val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating

Review comment:
       Do we need to adjust this? I think we want to remain in the `Fetching` 
state if truncation detection is through `Fetch`.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String,
 
       val initialLag = leaderEndOffset - offsetToFetch
       fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
-      PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, 
state = Fetching)
+      // We don't expect diverging epochs from the next fetch request, so 
resetting `lastFetchedEpoch`

Review comment:
       Again it seems safe to keep `lastFetchedEpoch` in sync with the local 
log. If we have done a full truncation above, then `lastFetchedEpoch` will be 
`None`, but otherwise it seems like we should set it.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig,
             logManager.abortAndPauseCleaning(topicPartition)
 
             val initialFetchState = 
InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
-              partition.getLeaderEpoch, futureLog.highWatermark)
+              partition.getLeaderEpoch, futureLog.highWatermark, 
lastFetchedEpoch = None)

Review comment:
       Do we need to initialize `lastFetchedEpoch`? It seems like the log may 
not be empty at this point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to