dajac commented on a change in pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#discussion_r704137955



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3242,6 +3260,92 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = 
Mockito.mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      // The first call to removeFetcherForPartitions should be ignored.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower
+      var followerTopicsDelta = topicsCreateDelta(localId, false)
+      var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 0,
+            initOffset = 0
+          ))
+        )
+
+      // The second call to removeFetcherForPartitions simulate the case
+      // where the fetcher write to the log before being shutdown.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenAnswer { _ =>
+        replicaManager.getPartition(topicPartition) match {
+          case HostedPartition.Online(partition) =>
+            partition.appendRecordsToFollowerOrFutureReplica(
+              records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+                new SimpleRecord("first message".getBytes)),
+              isFuture = false
+            )
+
+          case _ =>
+        }
+
+        Map.empty[TopicPartition, PartitionFetchState]
+      }
+
+      // Apply changes that bumps the leader epoch.
+      followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), 
localId, false)
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 1,
+            initOffset = 1
+          ))
+        )
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)

Review comment:
       I was also thinking about this as most of the tests have it but wanted 
to test/refactor this separately if you don't mind. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to