chia7712 commented on code in PR #20068:
URL: https://github.com/apache/kafka/pull/20068#discussion_r2187641399


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1335,11 +1335,14 @@ object TestUtils extends Logging {
       case Failure(e) => throw e
     }
 
-    assertTrue(isLeaderElected, s"Timed out waiting for leader to become 
$expectedLeaderOpt. " +
+    assertTrue(isLeaderElected, s"Timed out waiting for leader to become 
expectedLeaderOpt. " +

Review Comment:
   Why do we need this change?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2873,23 +2873,36 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     /** Changes the <i>preferred</i> leader without changing the 
<i>current</i> leader. */
     def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
       val preferred = newAssignment.head
-      val prior1 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, 
partition1.partition(), listenerName).get.id()
-      val prior2 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, 
partition2.partition(), listenerName).get.id()
-
-      var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
-      if (prior1 != preferred)
-        m += partition1 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
-      if (prior2 != preferred)
-        m += partition2 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
-      client.alterPartitionReassignments(m.asJava).all().get()
+      var prior1 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, 
partition1.partition(), listenerName).get.id()
+      var prior2 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, 
partition2.partition(), listenerName).get.id()
 
       TestUtils.waitUntilTrue(
-        () => preferredLeader(partition1) == preferred && 
preferredLeader(partition2) == preferred,
+        () => {
+          prior1 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, 
partition1.partition(), listenerName).get.id()
+          prior2 = 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, 
partition2.partition(), listenerName).get.id()
+          var reassignmentMap = Map.empty[TopicPartition, 
Optional[NewPartitionReassignment]]
+            if (prior1 != preferred)
+              reassignmentMap += partition1 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
+            if (prior2 != preferred)
+              reassignmentMap += partition2 -> Optional.of(new 
NewPartitionReassignment(newAssignment.map(Int.box).asJava))
+          
client.alterPartitionReassignments(reassignmentMap.asJava).all().get()
+          preferredLeader(partition1) == preferred && 
preferredLeader(partition2) == preferred},
         s"Expected preferred leader to become $preferred, but is 
${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
-        10000)
+        10000L)
+
       // Check the leader hasn't moved
-      TestUtils.assertLeader(client, partition1, prior1)
-      TestUtils.assertLeader(client, partition2, prior2)
+      TestUtils.waitUntilTrue(

Review Comment:
   It seems the way to stabilize the test is to ensure "all" brokers metadata 
get up-to-date, right?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2961,12 +2973,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(util.Set.of(partition2), electResult.partitions.get.keySet)
     assertFalse(electResult.partitions.get.get(partition2).isPresent)
     TestUtils.assertLeader(client, partition2, 2)
-
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
+
     // but shut it down...
     killBroker(1)
-    TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), 
Set(1))
+    TestUtils.waitUntilTrue(

Review Comment:
   all we need is the `waitForBrokersOutOfIsr`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to