jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689787245



##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##########
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
     public Set<Uuid> deletedTopicIds() {
         return deletedTopicIds;
     }
+
+    /**
+     * Find the topic partitions that have change base on the replica given.
+     *
+     * The changes identified are:
+     *   1. topic partitions for which the broker is not a replica anymore
+     *   2. topic partitions for which the broker is now the leader
+     *   3. topic partitions for which the broker is now a follower
+     *
+     * @param brokerId the broker id
+     * @return the list of topic partitions which the broker should remove, 
become leader or become follower.
+     */
+    public LocalReplicaChanges localChanges(int brokerId) {
+        Set<TopicPartition> deletes = new HashSet<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new 
HashMap<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new 
HashMap<>();
+
+        for (TopicDelta delta : changedTopics.values()) {
+            LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+            deletes.addAll(changes.deletes());
+            leaders.putAll(changes.leaders());
+            followers.putAll(changes.followers());
+        }
+
+        // Add all of the deleted topic partitions to the map of locally 
removed partitions
+        deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = image().getTopic(topicId);
+            topicImage.partitions().keySet().forEach(partitionId -> {
+                deletes.add(new TopicPartition(topicImage.name(), 
partitionId));

Review comment:
       Yes. Fixed and updated the test in `TopicsImageTest` to check for this 
case.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3020,34 +2949,183 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testDeltaFollowerToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = 
topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerRemovedTopic(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove topic and replica
+      val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
+      val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
+      replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaLeaderToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = 
topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed

Review comment:
       Good idea. Added the checks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to