jsancio commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r697549554
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers TopicControlInfo topic = topics.get(topicEntry.getValue()); if (topic != null) { for (int partitionId : topic.parts.keySet()) { - ApiError error = electLeader(topicName, partitionId, uncleanOk, records); + ApiError error = electLeader(topicName, partitionId, electionType, records); Review comment: Maybe we need an integration test for this but if the ApiError is `new ApiError(Errors.ELECTION_NOT_NEEDED)` we should not add the topic partition to the result. In the ZK implementation this filtering is done in KafkaApis. See `sendResponseCallback` in `handleElectLeaders`. ########## File path: core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala ########## @@ -18,188 +18,249 @@ package kafka.admin import java.io.File import java.nio.charset.StandardCharsets -import java.nio.file.Files -import java.nio.file.Path +import java.nio.file.{Files, Path} import kafka.common.AdminCommandFailedException -import kafka.server.KafkaConfig -import kafka.server.KafkaServer +import kafka.server.IntegrationTestUtils.createTopic +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.{ClusterConfig, ClusterInstance} import kafka.utils.TestUtils -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.errors.{TimeoutException, UnknownTopicOrPartitionException} import org.apache.kafka.common.network.ListenerName -import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{BeforeEach, Tag, Test} -import scala.jdk.CollectionConverters._ -import scala.collection.Seq import scala.concurrent.duration._ -final class LeaderElectionCommandTest extends ZooKeeperTestHarness { +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3) +@Tag("integration") +final class LeaderElectionCommandTest(cluster: ClusterInstance) { import LeaderElectionCommandTest._ - var servers = Seq.empty[KafkaServer] val broker1 = 0 val broker2 = 1 val broker3 = 2 @BeforeEach - override def setUp(): Unit = { - super.setUp() - - val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) - servers = brokerConfigs.map { config => - config.setProperty("auto.leader.rebalance.enable", "false") - config.setProperty("controlled.shutdown.enable", "true") - config.setProperty("controlled.shutdown.max.retries", "1") - config.setProperty("controlled.shutdown.retry.backoff.ms", "1000") - TestUtils.createServer(KafkaConfig.fromProps(config)) - } + def setup(clusterConfig: ClusterConfig): Unit = { + TestUtils.verifyNoUnexpectedThreads("@BeforeEach") + clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1") + clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000") + clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") } - @AfterEach - override def tearDown(): Unit = { - TestUtils.shutdownServers(servers) - - super.tearDown() - } - - @Test + @ClusterTest def testAllTopicPartition(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) - - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - val topicPartition = new TopicPartition(topic, partition) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - TestUtils.assertLeader(client, topicPartition, broker2) + val topicPartition = new TopicPartition(topic, partition) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + TestUtils.assertLeader(client, topicPartition, broker2) + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--all-topic-partitions" - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--all-topic-partitions" ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testTopicPartition(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - val topicPartition = new TopicPartition(topic, partition) + val topicPartition = new TopicPartition(topic, partition) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition, broker2) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--topic", topic, - "--partition", partition.toString - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--topic", topic, + "--partition", partition.toString ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testPathToJsonFile(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "unclean-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) - val topicPartition = new TopicPartition(topic, partition) + val topicPartition = new TopicPartition(topic, partition) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition, broker2) - servers(broker3).shutdown() - TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) - servers(broker2).shutdown() - TestUtils.assertNoLeader(client, topicPartition) - servers(broker3).startup() + cluster.shutdownBroker(broker3) + TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3)) + cluster.shutdownBroker(broker2) + TestUtils.assertNoLeader(client, topicPartition) + cluster.startBroker(broker3) + TestUtils.waitForOnlineBroker(client, broker3) - val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition)) + val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition)) - LeaderElectionCommand.main( - Array( - "--bootstrap-server", bootstrapServers(servers), - "--election-type", "unclean", - "--path-to-json-file", topicPartitionPath.toString - ) + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "unclean", + "--path-to-json-file", topicPartitionPath.toString ) + ) - TestUtils.assertLeader(client, topicPartition, broker3) - } + TestUtils.assertLeader(client, topicPartition, broker3) } - @Test + @ClusterTest def testPreferredReplicaElection(): Unit = { - TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client => - val topic = "unclean-topic" - val partition = 0 - val assignment = Seq(broker2, broker3) + val client = cluster.createAdminClient() + val topic = "preferred-topic" + val partition = 0 + val assignment = Seq(broker2, broker3) + + cluster.waitForReadyBrokers() + createTopic(client, topic, Map(partition -> assignment)) + + val topicPartition = new TopicPartition(topic, partition) + + TestUtils.assertLeader(client, topicPartition, broker2) + + cluster.shutdownBroker(broker2) + TestUtils.assertLeader(client, topicPartition, broker3) + cluster.startBroker(broker2) + TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2)) + + LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--all-topic-partitions" + ) + ) + + TestUtils.assertLeader(client, topicPartition, broker2) + } - TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers) + @ClusterTest + def testTopicDoesNotExist(): Unit = { + val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main( + Array( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "preferred", + "--topic", "unknown-topic-name", + "--partition", "0" + ) + )) + assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException]) + } - val topicPartition = new TopicPartition(topic, partition) + @ClusterTest + def testElectionResultOutput(): Unit = { + val client = cluster.createAdminClient() + val topic = "non-preferred-topic" + val partition0 = 0 + val partition1 = 1 + val assignment0 = Seq(broker2, broker3) + val assignment1 = Seq(broker3, broker2) + + cluster.waitForReadyBrokers() + createTopic(client, topic, Map( + partition0 -> assignment0, + partition1 -> assignment1 + )) + + val topicPartition0 = new TopicPartition(topic, partition0) + val topicPartition1 = new TopicPartition(topic, partition1) - TestUtils.assertLeader(client, topicPartition, broker2) + TestUtils.assertLeader(client, topicPartition0, broker2) + TestUtils.assertLeader(client, topicPartition1, broker3) - servers(broker2).shutdown() - TestUtils.assertLeader(client, topicPartition, broker3) - servers(broker2).startup() - TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2)) + cluster.shutdownBroker(broker2) + TestUtils.assertLeader(client, topicPartition0, broker3) + cluster.startBroker(broker2) + TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2)) + TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2)) + val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1)) + val output = TestUtils.grabConsoleOutput( LeaderElectionCommand.main( Array( - "--bootstrap-server", bootstrapServers(servers), + "--bootstrap-server", cluster.bootstrapServers(), "--election-type", "preferred", - "--all-topic-partitions" + "--path-to-json-file", topicPartitionPath.toString ) ) + ) - TestUtils.assertLeader(client, topicPartition, broker2) - } + val electionResultOutputIter = output.split("\n").iterator + + assertTrue(electionResultOutputIter.hasNext) + val firstLine = electionResultOutputIter.next() + assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"), + s"Unexpected output: $firstLine") + + assertTrue(electionResultOutputIter.hasNext) + val secondLine = electionResultOutputIter.next() + assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"), + s"Unexpected output: $secondLine") } +} + +/** + * For some error cases, we can save a little build time by avoiding the overhead + * for cluster creation and cleanup. + */ +class LeaderElectionCommandErrorTest { Review comment: Good idea. I searched for "CommandErrorTest" and it looks like this is the first time we do this. Should we standardize this pattern and move this class to its own file? ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -1052,6 +1114,160 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws ctx.replicationControl.getPartition(fooId, 1)); } + private void assertLeaderAndIsr( + ReplicationControlManager replication, + TopicIdPartition topicIdPartition, + int leaderId, + int[] isr + ) { + PartitionRegistration registration = replication.getPartition( + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + assertArrayEquals(isr, registration.isr); + assertEquals(leaderId, registration.leader); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); + + TopicIdPartition partition0 = new TopicIdPartition(fooId, 0); + TopicIdPartition partition1 = new TopicIdPartition(fooId, 1); + TopicIdPartition partition2 = new TopicIdPartition(fooId, 2); + + ctx.fenceBrokers(Utils.mkSet(2, 3)); + ctx.fenceBrokers(Utils.mkSet(1, 2, 3)); + + assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1}); + assertLeaderAndIsr(replication, partition1, 4, new int[]{4}); + assertLeaderAndIsr(replication, partition2, 0, new int[]{0}); + + ElectLeadersRequestData request = buildElectLeadersRequest( + ElectionType.UNCLEAN, + electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2)) + ); + + // No election can be done yet because no replicas are available for partition 0 + ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request); + assertEquals(Collections.emptyList(), result1.records()); + + ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE) + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("foo", 2), + new ApiError(ELECTION_NOT_NEEDED) + ) Review comment: I think we are not suppose to return partitions 1 and 2 if `null` is used in elect leaders request. In other words, it is expected that only partitions that changed are returned when elect all partitions is used. ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -1052,6 +1114,160 @@ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws ctx.replicationControl.getPartition(fooId, 1)); } + private void assertLeaderAndIsr( + ReplicationControlManager replication, + TopicIdPartition topicIdPartition, + int leaderId, + int[] isr + ) { + PartitionRegistration registration = replication.getPartition( + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + assertArrayEquals(isr, registration.isr); + assertEquals(leaderId, registration.leader); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); + + TopicIdPartition partition0 = new TopicIdPartition(fooId, 0); + TopicIdPartition partition1 = new TopicIdPartition(fooId, 1); + TopicIdPartition partition2 = new TopicIdPartition(fooId, 2); + + ctx.fenceBrokers(Utils.mkSet(2, 3)); + ctx.fenceBrokers(Utils.mkSet(1, 2, 3)); + + assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1}); + assertLeaderAndIsr(replication, partition1, 4, new int[]{4}); + assertLeaderAndIsr(replication, partition2, 0, new int[]{0}); + + ElectLeadersRequestData request = buildElectLeadersRequest( + ElectionType.UNCLEAN, + electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2)) + ); + + // No election can be done yet because no replicas are available for partition 0 + ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request); + assertEquals(Collections.emptyList(), result1.records()); + + ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE) + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("foo", 2), + new ApiError(ELECTION_NOT_NEEDED) + ) Review comment: This comment applies to a few places in this file. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -856,21 +867,24 @@ ApiError electLeader(String topic, int partitionId, boolean uncleanOk, return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId); } + if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader()) + || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) { + return new ApiError(Errors.ELECTION_NOT_NEEDED); + } + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, partitionId, r -> clusterControl.unfenced(r), - () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic)); - builder.setAlwaysElectPreferredIfPossible(true); + () -> electionType == ElectionType.UNCLEAN); + + builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED); Review comment: Outside the scope of this change but do you that it would help if internally we supported 3 elections: PREFERRED, ANY, UNCLEAN. With this enum we can change the constructor for `PartitionChangeBuilder` to take a function that returns this enum. Would that help with the API for `PartitionChangeBuilder` and address all of its uses? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org