hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r534424325



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -426,21 +451,35 @@ abstract class AbstractFetcherThread(name: String,
     warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+    if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+      currentState
+    } else if (initialFetchState.initOffset < 0) {
+      fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch)
+    } else if (isTruncationOnFetchSupported) {
+      val lastFetchedEpoch = latestEpoch(tp)
+      val state = if (lastFetchedEpoch.exists(_ != 
EpochEndOffset.UNDEFINED_EPOCH)) Fetching else Truncating

Review comment:
       Hmm.. Do we actually return `Some(EpochEndOffset.UNDEFINED_EPOCH)` from 
`latestEpoch`? That seems surprising.
   
   Might be worth a comment here that we still go through the `Truncating` 
state here when the message format is old.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -64,8 +64,8 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val removedPartitions = thread.partitionsAndOffsets
-        removeFetcherForPartitions(removedPartitions.keySet)
+        val removedPartitions = thread.removeAllPartitions()
+        removeFetcherForPartitions(removedPartitions.keySet) // clear state 
for removed partitions

Review comment:
       This reads a bit odd following `removeAllPartitions`. I guess what we 
get from `removeFetcherForPartitions` is the clearing of `failedPartitions` and 
de-registration from `fetcherLagStats`. Not super important, but wonder if it's 
worth trying to consolidate a little. Maybe `removeFetcherForPartitions` could 
return the initial fetch offsets or something.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1691,6 +1692,18 @@ class ReplicaManager(val config: KafkaConfig,
     partitionsToMakeFollower
   }
 
+  /**
+   * From IBP 2.7 onwards, we send latest fetch epoch in the request and 
truncate if a
+   * diverging epoch is returned in the response, avoiding the need for a 
separate
+   * OffsetForLeaderEpoch request.
+   */
+  private def initialFetchOffset(log: Log): Long = {

Review comment:
       I think this could be saved for a follow-up, but I wonder if we should 
consider similarly letting the initial offset be determined by the fetcher 
thread on initialization rather than being passed in. I find it confusing that 
we expect this to be the high watermark in some cases. It seems a little 
slippery the way we rely on it in 
`AbstractFetcherThread.truncateToHighWatermark`.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -453,6 +466,107 @@ class ReplicaFetcherThreadTest {
                truncateToCapture.getValues.asScala.contains(101))
   }
 
+  @Test
+  def shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower(): 
Unit = {
+
+    // Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    // Setup all dependencies
+    val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val log: Log = createNiceMock(classOf[Log])
+    val partition: Partition = createNiceMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
+
+    val initialLEO = 200
+    var latestLogEpoch: Option[Int] = Some(5)
+
+    // Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+    expect(partition.localLogOrException).andReturn(log).anyTimes()
+    expect(log.highWatermark).andReturn(115).anyTimes()
+    expect(log.latestEpoch).andAnswer(() => latestLogEpoch).anyTimes()
+    expect(log.endOffsetForEpoch(4)).andReturn(Some(OffsetAndEpoch(149, 
4))).anyTimes()
+    expect(log.endOffsetForEpoch(3)).andReturn(Some(OffsetAndEpoch(129, 
2))).anyTimes()
+    expect(log.endOffsetForEpoch(2)).andReturn(Some(OffsetAndEpoch(119, 
1))).anyTimes()
+    expect(log.logEndOffset).andReturn(initialLEO).anyTimes()
+    
expect(replicaManager.localLogOrException(anyObject(classOf[TopicPartition]))).andReturn(log).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    
expect(replicaManager.brokerTopicStats).andReturn(mock(classOf[BrokerTopicStats]))
+    stub(partition, replicaManager, log)
+
+    replay(replicaManager, logManager, quota, partition, log)
+
+    // Create the fetcher thread
+    val mockNetwork = new 
ReplicaFetcherMockBlockingSend(Collections.emptyMap(), brokerEndPoint, new 
SystemTime())
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
failedPartitions, replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork)) {
+      override def processPartitionData(topicPartition: TopicPartition, 
fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
+    }
+    thread.addPartitions(Map(t1p0 -> initialFetchState(initialLEO), t1p1 -> 
initialFetchState(initialLEO)))
+    val partitions = Set(t1p0, t1p1)
+
+    // Loop 1 -- both topic partitions skip epoch fetch and send fetch request 
since
+    // lastFetchedEpoch is set in initial fetch state.

Review comment:
       nit: needs update?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to