jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689787245
########## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ########## @@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) { public Set<Uuid> deletedTopicIds() { return deletedTopicIds; } + + /** + * Find the topic partitions that have change base on the replica given. + * + * The changes identified are: + * 1. topic partitions for which the broker is not a replica anymore + * 2. topic partitions for which the broker is now the leader + * 3. topic partitions for which the broker is now a follower + * + * @param brokerId the broker id + * @return the list of topic partitions which the broker should remove, become leader or become follower. + */ + public LocalReplicaChanges localChanges(int brokerId) { + Set<TopicPartition> deletes = new HashSet<>(); + Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>(); + Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>(); + + for (TopicDelta delta : changedTopics.values()) { + LocalReplicaChanges changes = delta.localChanges(brokerId); + + deletes.addAll(changes.deletes()); + leaders.putAll(changes.leaders()); + followers.putAll(changes.followers()); + } + + // Add all of the deleted topic partitions to the map of locally removed partitions + deletedTopicIds().forEach(topicId -> { + TopicImage topicImage = image().getTopic(topicId); + topicImage.partitions().keySet().forEach(partitionId -> { + deletes.add(new TopicPartition(topicImage.name(), partitionId)); Review comment: Yes. Fixed and updated the test in `TopicsImageTest` to check for this case. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3020,34 +2949,183 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testDeltaFollowerToNotReplica(): Unit = { + val localId = 1 + val otherId = localId + 1 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply changes that remove replica + val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true) + val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) + replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta) + + // Check that the partition was removed + assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + assertEquals(None, replicaManager.logManager.getLog(topicPartition)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaFollowerRemovedTopic(): Unit = { + val localId = 1 + val otherId = localId + 1 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply changes that remove topic and replica + val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics()) + val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply()) + replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta) + + // Check that the partition was removed + assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + assertEquals(None, replicaManager.logManager.getLog(topicPartition)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaLeaderToNotReplica(): Unit = { + val localId = 1 + val otherId = localId + 1 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the follower + val leaderTopicsDelta = topicsCreateDelta(localId, true) + val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) + replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) + assertEquals(0, leaderPartition.getLeaderEpoch) + + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + + // Apply changes that remove replica + val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true) + val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) + replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta) + + // Check that the partition was removed Review comment: Good idea. Added the checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org