jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689870784



##########
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##########
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
         IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
     }
 
+    private PartitionRegistration newPartition(int[] replicas) {
+        return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+    }
+
+    @Test
+    public void testLocalReplicaChanges() {
+        int localId = 3;
+        Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+        List<TopicImage> topics = new ArrayList<>(TOPIC_IMAGES1);
+        topics.add(
+            newTopicImage(
+                "foo",
+                newFooId,
+                newPartition(new int[] {0, 1, 3}),
+                newPartition(new int[] {3, 1, 2}),
+                newPartition(new int[] {0, 1, 3}),
+                newPartition(new int[] {3, 1, 2}),
+                newPartition(new int[] {0, 1, 2}),
+                newPartition(new int[] {0, 1, 2})
+            )
+        );
+
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+        // foo-0 - follower to leader
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(0)
+                  .setLeader(3),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // foo-1 - leader to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(1)
+                  .setLeader(1),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // foo-2 - follower to removed
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(2)
+                  .setIsr(Arrays.asList(0, 1, 2))
+                  .setReplicas(Arrays.asList(0, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // foo-3 - leader to removed
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(3)
+                  .setLeader(0)
+                  .setIsr(Arrays.asList(0, 1, 2))
+                  .setReplicas(Arrays.asList(0, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // foo-4 - not replica to leader
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(4)
+                  .setLeader(3)
+                  .setIsr(Arrays.asList(3, 1, 2))
+                  .setReplicas(Arrays.asList(3, 1, 2)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+        // foo-5 - not replica to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionChangeRecord()
+                  .setTopicId(newFooId)
+                  .setPartitionId(5)
+                  .setIsr(Arrays.asList(0, 1, 3))
+                  .setReplicas(Arrays.asList(0, 1, 3)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+
+        /* Changes already include in DELTA1_RECORDS:
+         * foo - topic id deleted
+         * bar-0 - stay as follower with different partition epoch
+         * baz-0 - new topic to leader
+         */
+
+        // baz-1 - new topic to follower
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionRecord()
+                    .setPartitionId(1)
+                    .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+                    .setReplicas(Arrays.asList(4, 2, 3))
+                    .setIsr(Arrays.asList(4, 2, 3))
+                    .setLeader(4)
+                    .setLeaderEpoch(2)
+                    .setPartitionEpoch(1),
+                PARTITION_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+
+        LocalReplicaChanges changes = delta.localChanges(localId);
+        assertEquals(
+            new HashSet<>(
+                Arrays.asList(
+                    new TopicPartition("foo", 2),
+                    new TopicPartition("foo", 3),
+                    new TopicPartition("foo", 0), // from remove topic of the 
old id

Review comment:
       Yeah. This is a bit subtle but in this batch of records the order of 
operation is:
   
   1. Delete `foo` with id `ThIaNwRnSM2Nt9Mx1v0RvA`: 
https://github.com/apache/kafka/pull/11216/files#diff-5a7faa2af57ac14f91b81428d8933208a80f9ffb035f76d63fca310e3cc21213R102-R104
   2. Recreate `foo` with id `0hHJ3X5ZQ-CFfQ5xgpj90w`: 
https://github.com/apache/kafka/pull/11216/files#diff-5a7faa2af57ac14f91b81428d8933208a80f9ffb035f76d63fca310e3cc21213R162-R171
   
   So we want to first delete `foo` with the old id the start a new replica for 
the new topic id: 
https://github.com/apache/kafka/pull/11216/files#diff-5a7faa2af57ac14f91b81428d8933208a80f9ffb035f76d63fca310e3cc21213R268
   
   The order in which we apply the changes is important that is why we always 
delete first and then become follower or leader:
   
https://github.com/apache/kafka/pull/11216/files#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2087-R2088
   
   There is an issue with the test where I already included the new topic foo 
in the image before applying the delta. Let me fix that: 
https://github.com/apache/kafka/pull/11216/files#diff-5a7faa2af57ac14f91b81428d8933208a80f9ffb035f76d63fca310e3cc21213R146-R157




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to