hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504331562
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -813,8 +852,9 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { override def toString: String = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted) } -case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) { +case class OffsetAndEpoch(offset: Long, leaderEpoch: Int, lastFetchedEpoch: Option[Int] = None) { Review comment: Wondering if it might be better not to change this type since it is used in contexts where `lastFetchedEpoch` is not relevant. Following the types through here, we first have use `InitialFetchState` in `AbstractFetcherManager`: ```scala def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) ``` We then convert to `OffsetAndEpoch` which gets passed down to `AbstractFetcherThread`: ```scala def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] ``` Then this gets converted to `PartitionFetchState`. I wonder if it's possible to skip the conversion through `OffsetAndEpoch` and use `InitialFetchState` consistently? Perhaps the only reason the current code doesn't do that is that `InitialFetchState` includes the broker end point which is not really relevant to the fetcher thread. Maybe that's not such a big deal? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String, failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => - // We can skip the truncation step iff the leader epoch matches the existing epoch + // For IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + // For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch val currentState = partitionStates.stateValue(tp) - val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { + val updatedState = if (initialFetchState.offset >= 0 && isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState != null) + currentState + else + PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) + } else if (currentState != null && (currentState.currentLeaderEpoch == initialFetchState.leaderEpoch)) { Review comment: nit: unnecessary parenthesis ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String, } } + private def truncateOnFetchResponse(responseData: Map[TopicPartition, FetchData]): Unit = { + val epochEndOffsets = responseData + .filter { case (tp, fetchData) => fetchData.error == Errors.NONE && fetchData.divergingEpoch.isPresent } + .map { case (tp, fetchData) => + val divergingEpoch = fetchData.divergingEpoch.get + tp -> new EpochEndOffset(Errors.NONE, divergingEpoch.epoch, divergingEpoch.endOffset) + }.toMap + inLock(partitionMapLock) { Review comment: Borderline overkill perhaps, but we could check if `epochEndOffsets` is non-empty before acquiring the lock ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String, } } + private def truncateOnFetchResponse(responseData: Map[TopicPartition, FetchData]): Unit = { + val epochEndOffsets = responseData Review comment: Rather than doing an additional pass over the response partitions, would it be reasonable to build `epochEndOffsets` inline with the other error handling in `processFetchRequest`? ########## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ########## @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation Review comment: Good catch here and in `FetchSession`. Do you think we should consider doing these fixes separately so that we can get them into 2.7? Otherwise it might be difficult to tie this behavior to the 2.7 IBP. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String, failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => - // We can skip the truncation step iff the leader epoch matches the existing epoch + // For IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + // For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch val currentState = partitionStates.stateValue(tp) - val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { + val updatedState = if (initialFetchState.offset >= 0 && isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState != null) + currentState Review comment: Below we only use `currentState` if the current epoch matches the initial epoch. Why is it safe to skip that check here? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -408,9 +428,12 @@ abstract class AbstractFetcherThread(name: String, def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long): Unit = { partitionMapLock.lockInterruptibly() try { + // It is safe to reset `lastFetchedEpoch` here since we don't expect diverging offsets Review comment: Can be done separately, but it would be nice to figure out how to move this logic into `ReplicaAlterLogDirManager` since this comment seems to only make sense if we assume this is the log dir fetcher and reconciliation with the leader has already completed. In fact, I wonder if it is possible to get rid of this code entirely. If the log dir fetcher is also tracking `lastFetchedEpoch`, then we could rely on detecting truncation dynamically through `ReplicaManager.fetchMessages` instead of the current somewhat clumsy coordination with the replica fetcher. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org