hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r534424325
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -426,21 +451,35 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { + if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState + } else if (initialFetchState.initOffset < 0) { + fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch) + } else if (isTruncationOnFetchSupported) { + val lastFetchedEpoch = latestEpoch(tp) + val state = if (lastFetchedEpoch.exists(_ != EpochEndOffset.UNDEFINED_EPOCH)) Fetching else Truncating Review comment: Hmm.. Do we actually return `Some(EpochEndOffset.UNDEFINED_EPOCH)` from `latestEpoch`? That seems surprising. Might be worth a comment here that we still go through the `Truncating` state here when the message format is old. ########## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ########## @@ -64,8 +64,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { fetcherThreadMap.forKeyValue { (id, thread) => - val removedPartitions = thread.partitionsAndOffsets - removeFetcherForPartitions(removedPartitions.keySet) + val removedPartitions = thread.removeAllPartitions() + removeFetcherForPartitions(removedPartitions.keySet) // clear state for removed partitions Review comment: This reads a bit odd following `removeAllPartitions`. I guess what we get from `removeFetcherForPartitions` is the clearing of `failedPartitions` and de-registration from `fetcherLagStats`. Not super important, but wonder if it's worth trying to consolidate a little. Maybe `removeFetcherForPartitions` could return the initial fetch offsets or something. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1691,6 +1692,18 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + /** + * From IBP 2.7 onwards, we send latest fetch epoch in the request and truncate if a + * diverging epoch is returned in the response, avoiding the need for a separate + * OffsetForLeaderEpoch request. + */ + private def initialFetchOffset(log: Log): Long = { Review comment: I think this could be saved for a follow-up, but I wonder if we should consider similarly letting the initial offset be determined by the fetcher thread on initialization rather than being passed in. I find it confusing that we expect this to be the high watermark in some cases. It seems a little slippery the way we rely on it in `AbstractFetcherThread.truncateToHighWatermark`. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ########## @@ -453,6 +466,107 @@ class ReplicaFetcherThreadTest { truncateToCapture.getValues.asScala.contains(101)) } + @Test + def shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower(): Unit = { + + // Create a capture to track what partitions/offsets are truncated + val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL) + + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + + // Setup all dependencies + val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) + val logManager: LogManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val log: Log = createNiceMock(classOf[Log]) + val partition: Partition = createNiceMock(classOf[Partition]) + val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) + + val initialLEO = 200 + var latestLogEpoch: Option[Int] = Some(5) + + // Stubs + expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() + expect(partition.localLogOrException).andReturn(log).anyTimes() + expect(log.highWatermark).andReturn(115).anyTimes() + expect(log.latestEpoch).andAnswer(() => latestLogEpoch).anyTimes() + expect(log.endOffsetForEpoch(4)).andReturn(Some(OffsetAndEpoch(149, 4))).anyTimes() + expect(log.endOffsetForEpoch(3)).andReturn(Some(OffsetAndEpoch(129, 2))).anyTimes() + expect(log.endOffsetForEpoch(2)).andReturn(Some(OffsetAndEpoch(119, 1))).anyTimes() + expect(log.logEndOffset).andReturn(initialLEO).anyTimes() + expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes() + expect(replicaManager.logManager).andReturn(logManager).anyTimes() + expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() + expect(replicaManager.brokerTopicStats).andReturn(mock(classOf[BrokerTopicStats])) + stub(partition, replicaManager, log) + + replay(replicaManager, logManager, quota, partition, log) + + // Create the fetcher thread + val mockNetwork = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint, new SystemTime()) + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) { + override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None + } + thread.addPartitions(Map(t1p0 -> initialFetchState(initialLEO), t1p1 -> initialFetchState(initialLEO))) + val partitions = Set(t1p0, t1p1) + + // Loop 1 -- both topic partitions skip epoch fetch and send fetch request since + // lastFetchedEpoch is set in initial fetch state. Review comment: nit: needs update? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org