junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689659905



##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##########
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
     public Set<Uuid> deletedTopicIds() {
         return deletedTopicIds;
     }
+
+    /**
+     * Find the topic partitions that have change base on the replica given.

Review comment:
       base => based

##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##########
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
     public Set<Uuid> deletedTopicIds() {
         return deletedTopicIds;
     }
+
+    /**
+     * Find the topic partitions that have change base on the replica given.
+     *
+     * The changes identified are:
+     *   1. topic partitions for which the broker is not a replica anymore
+     *   2. topic partitions for which the broker is now the leader
+     *   3. topic partitions for which the broker is now a follower
+     *
+     * @param brokerId the broker id
+     * @return the list of topic partitions which the broker should remove, 
become leader or become follower.
+     */
+    public LocalReplicaChanges localChanges(int brokerId) {
+        Set<TopicPartition> deletes = new HashSet<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new 
HashMap<>();
+        Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new 
HashMap<>();
+
+        for (TopicDelta delta : changedTopics.values()) {
+            LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+            deletes.addAll(changes.deletes());
+            leaders.putAll(changes.leaders());
+            followers.putAll(changes.followers());
+        }
+
+        // Add all of the deleted topic partitions to the map of locally 
removed partitions
+        deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = image().getTopic(topicId);
+            topicImage.partitions().keySet().forEach(partitionId -> {
+                deletes.add(new TopicPartition(topicImage.name(), 
partitionId));

Review comment:
       Should we further check that the deleted partition has a replica on 
brokerId before adding it to deletes?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3020,34 +2949,183 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testDeltaFollowerToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = 
topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerRemovedTopic(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+
+      val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+      // Apply changes that remove topic and replica
+      val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
+      val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
+      replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
+
+      // Check that the partition was removed
+      assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+      assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaLeaderToNotReplica(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val leaderTopicsDelta = topicsCreateDelta(localId, true)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(0, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Apply changes that remove replica
+      val notReplicaTopicsDelta = 
topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
+      val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+      replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+      // Check that the partition was removed

Review comment:
       In RaftClusterTest.testCreateClusterAndPerformReassignment(), should we 
add the same check to verify that the old replicas are removed after 
reassignment?

##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
##########
@@ -93,43 +93,49 @@ public TopicImage apply() {
     }
 
     /**
-     * Find the partitions that we are now leading, whose partition epoch has 
changed.
+     * Find the partitions that have change base on the replica given.

Review comment:
       base => based




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to