chia7712 commented on code in PR #20003:
URL: https://github.com/apache/kafka/pull/20003#discussion_r2164659673


##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -351,6 +351,42 @@ public void testCancellationWithAddingReplicaInIsr() 
throws Exception {
         verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), 
foo0.partition(), 4));
     }
 
+    @ClusterTest
+    public void testCancellationWithAddingReplicaInIsr() throws Exception {
+        createTopics();
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        produceMessages(foo0.topic(), foo0.partition(), 200);
+
+        // The reassignment will bring replicas 3 and 4 into the replica set.
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // We will throttle replica 4 so that only replica 3 joins the ISR
+        setReplicationThrottleForPartitions(foo0);
+
+        // Execute the assignment and wait for replica 3 (only) to join the ISR
+        runExecuteAssignment(false, assignment, -1L, -1L);
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            TestUtils.waitForCondition(
+                () -> {
+                    Set<Integer> isr = 
admin.describeTopics(Collections.singleton(foo0.topic()))
+                        
.allTopicNames().get().get(foo0.topic()).partitions().stream()
+                        .filter(p -> p.partition() == foo0.partition())
+                        .flatMap(p -> p.isr().stream())
+                        .map(Node::id).collect(Collectors.toSet());
+                    return isr.containsAll(Arrays.asList(0, 1, 2, 3));
+                },
+                "Timed out while waiting for replica 3 to join the ISR"
+            );
+        }
+
+        // Now cancel the assignment and verify that the partition is removed 
from cancelled replicas
+        assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), 
Collections.emptySet()), runCancelAssignment(assignment, true, true));

Review Comment:
   `runCancelAssignment` returns `SimpleImmutableEntry`, so you use 
`SimpleImmutableEntry` to verify the value, right? If so, that is fine in this 
PR. However, could you please file a PR to replace `SimpleImmutableEntry` by 
`Map.entry`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to