jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r697549554



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVers
                 TopicControlInfo topic = topics.get(topicEntry.getValue());
                 if (topic != null) {
                     for (int partitionId : topic.parts.keySet()) {
-                        ApiError error = electLeader(topicName, partitionId, 
uncleanOk, records);
+                        ApiError error = electLeader(topicName, partitionId, 
electionType, records);

Review comment:
       Maybe we need an integration test for this but if the ApiError is `new 
ApiError(Errors.ELECTION_NOT_NEEDED)` we should not add the topic partition to 
the result. In the ZK implementation this filtering is done in KafkaApis. See 
`sendResponseCallback` in `handleElectLeaders`.

##########
File path: core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
##########
@@ -18,188 +18,249 @@ package kafka.admin
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
-import java.nio.file.Path
+import java.nio.file.{Files, Path}
 
 import kafka.common.AdminCommandFailedException
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
+import kafka.server.IntegrationTestUtils.createTopic
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
 import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TimeoutException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.network.ListenerName
-import org.junit.jupiter.api.AfterEach
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{BeforeEach, Tag, Test}
 
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
 import scala.concurrent.duration._
 
-final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@Tag("integration")
+final class LeaderElectionCommandTest(cluster: ClusterInstance) {
   import LeaderElectionCommandTest._
 
-  var servers = Seq.empty[KafkaServer]
   val broker1 = 0
   val broker2 = 1
   val broker3 = 2
 
   @BeforeEach
-  override def setUp(): Unit = {
-    super.setUp()
-
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    servers = brokerConfigs.map { config =>
-      config.setProperty("auto.leader.rebalance.enable", "false")
-      config.setProperty("controlled.shutdown.enable", "true")
-      config.setProperty("controlled.shutdown.max.retries", "1")
-      config.setProperty("controlled.shutdown.retry.backoff.ms", "1000")
-      TestUtils.createServer(KafkaConfig.fromProps(config))
-    }
+  def setup(clusterConfig: ClusterConfig): Unit = {
+    TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
+    
clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
"false")
+    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, 
"true")
+    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp,
 "1")
+    
clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp,
 "1000")
+    
clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp,
 "2")
   }
 
-  @AfterEach
-  override def tearDown(): Unit = {
-    TestUtils.shutdownServers(servers)
-
-    super.tearDown()
-  }
-
-  @Test
+  @ClusterTest
   def testAllTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
-
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), 
servers)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      val topicPartition = new TopicPartition(topic, partition)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), 
Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    TestUtils.assertLeader(client, topicPartition, broker2)
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--all-topic-partitions"
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--all-topic-partitions"
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testTopicPartition(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), 
servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), 
Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--topic", topic,
-          "--partition", partition.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--topic", topic,
+        "--partition", partition.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPathToJsonFile(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "unclean-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), 
servers)
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
 
-      val topicPartition = new TopicPartition(topic, partition)
+    val topicPartition = new TopicPartition(topic, partition)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition, broker2)
 
-      servers(broker3).shutdown()
-      TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), 
Set(broker3))
-      servers(broker2).shutdown()
-      TestUtils.assertNoLeader(client, topicPartition)
-      servers(broker3).startup()
+    cluster.shutdownBroker(broker3)
+    TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertNoLeader(client, topicPartition)
+    cluster.startBroker(broker3)
+    TestUtils.waitForOnlineBroker(client, broker3)
 
-      val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
 
-      LeaderElectionCommand.main(
-        Array(
-          "--bootstrap-server", bootstrapServers(servers),
-          "--election-type", "unclean",
-          "--path-to-json-file", topicPartitionPath.toString
-        )
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "unclean",
+        "--path-to-json-file", topicPartitionPath.toString
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker3)
-    }
+    TestUtils.assertLeader(client, topicPartition, broker3)
   }
 
-  @Test
+  @ClusterTest
   def testPreferredReplicaElection(): Unit = {
-    TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
-      val topic = "unclean-topic"
-      val partition = 0
-      val assignment = Seq(broker2, broker3)
+    val client = cluster.createAdminClient()
+    val topic = "preferred-topic"
+    val partition = 0
+    val assignment = Seq(broker2, broker3)
+
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(partition -> assignment))
+
+    val topicPartition = new TopicPartition(topic, partition)
+
+    TestUtils.assertLeader(client, topicPartition, broker2)
+
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+
+    LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "preferred",
+        "--all-topic-partitions"
+      )
+    )
+
+    TestUtils.assertLeader(client, topicPartition, broker2)
+  }
 
-      TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), 
servers)
+  @ClusterTest
+  def testTopicDoesNotExist(): Unit = {
+    val e = assertThrows(classOf[AdminCommandFailedException], () => 
LeaderElectionCommand.main(
+      Array(
+        "--bootstrap-server", cluster.bootstrapServers(),
+        "--election-type", "preferred",
+        "--topic", "unknown-topic-name",
+        "--partition", "0"
+      )
+    ))
+    
assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
+  }
 
-      val topicPartition = new TopicPartition(topic, partition)
+  @ClusterTest
+  def testElectionResultOutput(): Unit = {
+    val client = cluster.createAdminClient()
+    val topic = "non-preferred-topic"
+    val partition0 = 0
+    val partition1 = 1
+    val assignment0 = Seq(broker2, broker3)
+    val assignment1 = Seq(broker3, broker2)
+
+    cluster.waitForReadyBrokers()
+    createTopic(client, topic, Map(
+      partition0 -> assignment0,
+      partition1 -> assignment1
+    ))
+
+    val topicPartition0 = new TopicPartition(topic, partition0)
+    val topicPartition1 = new TopicPartition(topic, partition1)
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
+    TestUtils.assertLeader(client, topicPartition0, broker2)
+    TestUtils.assertLeader(client, topicPartition1, broker3)
 
-      servers(broker2).shutdown()
-      TestUtils.assertLeader(client, topicPartition, broker3)
-      servers(broker2).startup()
-      TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+    cluster.shutdownBroker(broker2)
+    TestUtils.assertLeader(client, topicPartition0, broker3)
+    cluster.startBroker(broker2)
+    TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
+    TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
 
+    val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, 
topicPartition1))
+    val output = TestUtils.grabConsoleOutput(
       LeaderElectionCommand.main(
         Array(
-          "--bootstrap-server", bootstrapServers(servers),
+          "--bootstrap-server", cluster.bootstrapServers(),
           "--election-type", "preferred",
-          "--all-topic-partitions"
+          "--path-to-json-file", topicPartitionPath.toString
         )
       )
+    )
 
-      TestUtils.assertLeader(client, topicPartition, broker2)
-    }
+    val electionResultOutputIter = output.split("\n").iterator
+
+    assertTrue(electionResultOutputIter.hasNext)
+    val firstLine = electionResultOutputIter.next()
+    assertTrue(firstLine.contains(s"Successfully completed leader election 
(PREFERRED) for partitions $topicPartition0"),
+    s"Unexpected output: $firstLine")
+
+    assertTrue(electionResultOutputIter.hasNext)
+    val secondLine = electionResultOutputIter.next()
+    assertTrue(secondLine.contains(s"Valid replica already elected for 
partitions $topicPartition1"),
+    s"Unexpected output: $secondLine")
   }
+}
+
+/**
+ * For some error cases, we can save a little build time by avoiding the 
overhead
+ * for cluster creation and cleanup.
+ */
+class LeaderElectionCommandErrorTest {

Review comment:
       Good idea. I searched for "CommandErrorTest" and it looks like this is 
the first time we do this. Should we standardize this pattern and move this 
class to its own file?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1052,6 +1114,160 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    private void assertLeaderAndIsr(
+        ReplicationControlManager replication,
+        TopicIdPartition topicIdPartition,
+        int leaderId,
+        int[] isr
+    ) {
+        PartitionRegistration registration = replication.getPartition(
+            topicIdPartition.topicId(),
+            topicIdPartition.partitionId()
+        );
+        assertArrayEquals(isr, registration.isr);
+        assertEquals(leaderId, registration.leader);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testElectUncleanLeaders(boolean electAllPartitions) throws 
Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+        assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.UNCLEAN,
+            electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+        );
+
+        // No election can be done yet because no replicas are available for 
partition 0
+        ControllerResult<ElectLeadersResponseData> result1 = 
replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result1.records());
+
+        ElectLeadersResponseData expectedResponse1 = 
buildElectLeadersResponse(NONE, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )

Review comment:
       I think we are not suppose to return partitions 1 and 2 if `null` is 
used in elect leaders request. In other words, it is expected that only 
partitions that changed are returned when elect all partitions is used.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1052,6 +1114,160 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    private void assertLeaderAndIsr(
+        ReplicationControlManager replication,
+        TopicIdPartition topicIdPartition,
+        int leaderId,
+        int[] isr
+    ) {
+        PartitionRegistration registration = replication.getPartition(
+            topicIdPartition.topicId(),
+            topicIdPartition.partitionId()
+        );
+        assertArrayEquals(isr, registration.isr);
+        assertEquals(leaderId, registration.leader);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testElectUncleanLeaders(boolean electAllPartitions) throws 
Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+        ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+        assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+        assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+        assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+        ElectLeadersRequestData request = buildElectLeadersRequest(
+            ElectionType.UNCLEAN,
+            electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+        );
+
+        // No election can be done yet because no replicas are available for 
partition 0
+        ControllerResult<ElectLeadersResponseData> result1 = 
replication.electLeaders(request);
+        assertEquals(Collections.emptyList(), result1.records());
+
+        ElectLeadersResponseData expectedResponse1 = 
buildElectLeadersResponse(NONE, Utils.mkMap(
+            Utils.mkEntry(
+                new TopicPartition("foo", 0),
+                new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 1),
+                new ApiError(ELECTION_NOT_NEEDED)
+            ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(ELECTION_NOT_NEEDED)
+            )

Review comment:
       This comment applies to a few places in this file.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
+        if ((electionType == ElectionType.PREFERRED && 
partition.hasPreferredLeader())
+            || (electionType == ElectionType.UNCLEAN && 
partition.hasLeader())) {
+            return new ApiError(Errors.ELECTION_NOT_NEEDED);
+        }
+
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            () -> uncleanOk || 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
-        builder.setAlwaysElectPreferredIfPossible(true);
+            () -> electionType == ElectionType.UNCLEAN);
+
+        builder.setAlwaysElectPreferredIfPossible(electionType == 
ElectionType.PREFERRED);

Review comment:
       Outside the scope of this change but do you that it would help if 
internally we supported 3 elections: PREFERRED, ANY, UNCLEAN. With this enum we 
can change the constructor for `PartitionChangeBuilder` to take a function that 
returns this enum. Would that help with the API for `PartitionChangeBuilder` 
and address all of its uses?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to