hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating + // Resetting `lastFetchedEpoch` since we are truncating and don't expect diverging epoch in the next fetch Review comment: This is a little unclear to me. I guess it is safe to reset `lastFetchedEpoch` as long as we reinitialize it after the next leader change. On the other hand, it seems safer to always retain the value. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { + if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState == null) { + return PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) + } + // If we are in `Fetching` state can continue to fetch regardless of current leader epoch and truncate + // if necessary based on diverging epochs returned by the leader. If we are currently in Truncating state, + // fall through and handle based on current epoch. + if (currentState.state == Fetching) { + return currentState Review comment: Is it not possible that the `InitialFetchState` has a bump to the current leader epoch? We will still need the latest epoch in order to continue fetching. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData - val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching) + val newFetchState = PartitionFetchState(nextOffset, Some(lag), + currentFetchState.currentLeaderEpoch, state = Fetching, + Some(currentFetchState.currentLeaderEpoch)) Review comment: This doesn't seem right. The last fetched epoch is supposed to represent the epoch of the last fetched batch. The fetcher could be fetching the data from an older epoch here. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating Review comment: Do we need to adjust this? I think we want to remain in the `Fetching` state if truncation detection is through `Fetch`. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String, val initialLag = leaderEndOffset - offsetToFetch fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag - PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching) + // We don't expect diverging epochs from the next fetch request, so resetting `lastFetchedEpoch` Review comment: Again it seems safe to keep `lastFetchedEpoch` in sync with the local log. If we have done a full truncation above, then `lastFetchedEpoch` will be `None`, but otherwise it seems like we should set it. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig, logManager.abortAndPauseCleaning(topicPartition) val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1), - partition.getLeaderEpoch, futureLog.highWatermark) + partition.getLeaderEpoch, futureLog.highWatermark, lastFetchedEpoch = None) Review comment: Do we need to initialize `lastFetchedEpoch`? It seems like the log may not be empty at this point. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org