showuon commented on code in PR #14891:
URL: https://github.com/apache/kafka/pull/14891#discussion_r1450086585


##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -472,10 +516,11 @@ public void testCreateAlterTopicWithRackAware(String 
quorum) throws Exception {
             "--topic", testTopicName);
         topicService.alterTopic(alterOpts);
 
+        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
         kafka.utils.TestUtils.waitUntilTrue(
             () -> brokers().forall(p -> 
p.metadataCache().getTopicPartitions(testTopicName).size() == 
alteredNumPartitions),
             () -> "Timeout waiting for new assignment propagating to broker",
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
+            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 500L);

Review Comment:
   Why changing `pause` from 100ms to 500ms?



##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -159,100 +166,117 @@ public void close() throws Exception {
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreate(String quorum) throws Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", "2", "--replication-factor", "1", 
"--topic", testTopicName));
-
-        
assertTrue(adminClient.listTopics().names().get().contains(testTopicName));
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 1,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()

Review Comment:
   Also, if we don't use `createAndWaitTopic` and any other util methods in 
this class, should we also remove them?



##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -159,100 +166,117 @@ public void close() throws Exception {
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreate(String quorum) throws Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", "2", "--replication-factor", "1", 
"--topic", testTopicName));
-
-        
assertTrue(adminClient.listTopics().names().get().contains(testTopicName));
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 1,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()

Review Comment:
   Could you explain what's the difference between the existing 
`createAndWaitTopic` and `createTopicWithAdmin` util methods? 



##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -787,16 +827,28 @@ public void 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(St
 
         // describe the topic and test if it's under-replicated
         String simpleDescribeOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName));
-        String[] simpleDescribeOutputRows = simpleDescribeOutput.split("\n");
-        
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
-        assertEquals(2, simpleDescribeOutputRows.length);
+        String[] simpleDescribeOutputRows = 
simpleDescribeOutput.split(System.lineSeparator());
+        
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
+                "Unexpected describe output: " + simpleDescribeOutputRows[0]);
+        assertEquals(2, simpleDescribeOutputRows.length,
+                "Unexpected describe output length: " + 
simpleDescribeOutputRows.length);
 
         String underReplicatedOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-replicated-partitions"));
         assertEquals("", underReplicatedOutput,
             String.format("--under-replicated-partitions shouldn't return 
anything: '%s'", underReplicatedOutput));
 
         // Verify reassignment is still ongoing.
-        PartitionReassignment reassignments = 
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
+        PartitionReassignment reassignments = null;
+        int retryCount = 0;
+        int maxRetries = 20;
+        while (reassignments == null && retryCount < maxRetries) {
+            reassignments = 
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
+            if (reassignments != null) {
+                break;
+            }
+            retryCount++;
+            Thread.sleep(100L);
+        }
         assertFalse(reassignments.addingReplicas().isEmpty());

Review Comment:
   Could you explain why we need this change? And why can't we use 
`TestUtils.waitUntilTrue`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to