junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689841279
########## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ########## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } + private PartitionRegistration newPartition(int[] replicas) { + return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); + } + + @Test + public void testLocalReplicaChanges() { + int localId = 3; + Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + + List<TopicImage> topics = new ArrayList<>(TOPIC_IMAGES1); + topics.add( + newTopicImage( + "foo", + newFooId, + newPartition(new int[] {0, 1, 3}), + newPartition(new int[] {3, 1, 2}), + newPartition(new int[] {0, 1, 3}), + newPartition(new int[] {3, 1, 2}), + newPartition(new int[] {0, 1, 2}), + newPartition(new int[] {0, 1, 2}) + ) + ); + + TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + + List<ApiMessageAndVersion> topicRecords = new ArrayList<>(DELTA1_RECORDS); + // foo-0 - follower to leader + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + // foo-1 - leader to follower + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + // foo-2 - follower to removed + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + // foo-3 - leader to removed + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + // foo-4 - not replica to leader + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + // foo-5 - not replica to follower + topicRecords.add( + new ApiMessageAndVersion( + new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), + PARTITION_CHANGE_RECORD.highestSupportedVersion() + ) + ); + + /* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + + // baz-1 - new topic to follower + topicRecords.add( + new ApiMessageAndVersion( + new PartitionRecord() + .setPartitionId(1) + .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) + .setReplicas(Arrays.asList(4, 2, 3)) + .setIsr(Arrays.asList(4, 2, 3)) + .setLeader(4) + .setLeaderEpoch(2) + .setPartitionEpoch(1), + PARTITION_RECORD.highestSupportedVersion() + ) + ); + + TopicsDelta delta = new TopicsDelta(image); + RecordTestUtils.replayAll(delta, topicRecords); + + LocalReplicaChanges changes = delta.localChanges(localId); + assertEquals( + new HashSet<>( + Arrays.asList( + new TopicPartition("foo", 2), + new TopicPartition("foo", 3), + new TopicPartition("foo", 0), // from remove topic of the old id Review comment: Hmm, here, it seems that we recreated topic "foo". Deleting this partition will cause the recreated topic's log data to be deleted, which doesn't seem right? ########## File path: core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala ########## @@ -410,6 +421,32 @@ class KRaftClusterTest { } } + private def checkReplicaManager(cluster: KafkaClusterTestKit, expectedHosting: List[(Int, List[Boolean])]): Unit = { + for ((brokerId, partitionsIsHosted) <- expectedHosting) { + val broker = cluster.brokers().get(brokerId) + // lock and unlock so we can read the replica manager Review comment: Hmm, why do lock and unlock allow the reading of replica manager? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org