niket-goel commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r686498956
########## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ########## @@ -173,6 +175,127 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } + @Test + public void testFenceMultipleBrokers() throws Throwable { + int brokerCount = 5; + int brokersToKeepUnfenced = 1; + short replicationFactor = 5; + Long sessionTimeout = 1L; + Long sleepMillis = (sessionTimeout * 1000) / 2; + + try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { + try (QuorumControllerTestEnv controlEnv = + new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeout))) { + ListenerCollection listeners = new ListenerCollection(); + listeners.add(new Listener().setName("PLAINTEXT"). + setHost("localhost").setPort(9092)); + QuorumController active = controlEnv.activeController(); + Map<Integer, Long> brokerEpochs = new HashMap<>(); + for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(brokerId). + setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). + setListeners(listeners)); + brokerEpochs.put(brokerId, reply.get().epoch()); + } + + // Brokers are only registered but still fenced + // Topic creation with no available unfenced brokers should fail + CreateTopicsRequestData createTopicsRequestData = + new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + createTopicsRequestData).get(); + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + createTopicsResponseData.topics().find("foo").errorCode()); + assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + + "are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + + // Unfence all brokers + sendBrokerheartbeat(active, brokerCount, brokerEpochs); + createTopicsResponseData = active.createTopics( + createTopicsRequestData).get(); + assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); + Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); + sendBrokerheartbeat(active, brokerCount, brokerEpochs); + Thread.sleep(sleepMillis); + + // All brokers should still be unfenced + for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), + "Broker " + brokerId + " should have been unfenced"); + } + createTopicsRequestData = new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("bar").setNumPartitions(1). + setConfigs(new CreateableTopicConfigCollection(Collections. + singleton(new CreateableTopicConfig().setName("min.insync.replicas"). + setValue("2")).iterator())). + setReplicationFactor(replicationFactor)).iterator())); + createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); + assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("bar").errorCode()); + Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + + // Fence some of the brokers + boolean fencingComplete; + Long waitIterations = 0L; + do { + fencingComplete = true; + sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); + for (int i = brokersToKeepUnfenced; i < brokerCount; i++) { + if (active.replicationControl().isBrokerUnfenced(i)) { + fencingComplete = false; + } + } + Thread.sleep(1000); + waitIterations++; + + if (waitIterations >= sessionTimeout * 3) { + assertTrue(false, "Fencing of brokers did not process within expected time"); + } + } while (!fencingComplete); Review comment: Will check this out. Thanks for the pointer. ########## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ########## @@ -173,6 +175,127 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } + @Test + public void testFenceMultipleBrokers() throws Throwable { + int brokerCount = 5; + int brokersToKeepUnfenced = 1; + short replicationFactor = 5; + Long sessionTimeout = 1L; + Long sleepMillis = (sessionTimeout * 1000) / 2; + + try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { + try (QuorumControllerTestEnv controlEnv = + new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeout))) { + ListenerCollection listeners = new ListenerCollection(); + listeners.add(new Listener().setName("PLAINTEXT"). + setHost("localhost").setPort(9092)); + QuorumController active = controlEnv.activeController(); + Map<Integer, Long> brokerEpochs = new HashMap<>(); + for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(brokerId). + setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). + setListeners(listeners)); + brokerEpochs.put(brokerId, reply.get().epoch()); + } + + // Brokers are only registered but still fenced + // Topic creation with no available unfenced brokers should fail + CreateTopicsRequestData createTopicsRequestData = + new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + createTopicsRequestData).get(); + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + createTopicsResponseData.topics().find("foo").errorCode()); + assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + + "are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + + // Unfence all brokers + sendBrokerheartbeat(active, brokerCount, brokerEpochs); + createTopicsResponseData = active.createTopics( + createTopicsRequestData).get(); + assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); + Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); + sendBrokerheartbeat(active, brokerCount, brokerEpochs); + Thread.sleep(sleepMillis); + + // All brokers should still be unfenced + for (int brokerId = 0; brokerId < brokerCount; brokerId++) { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), + "Broker " + brokerId + " should have been unfenced"); + } + createTopicsRequestData = new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("bar").setNumPartitions(1). + setConfigs(new CreateableTopicConfigCollection(Collections. + singleton(new CreateableTopicConfig().setName("min.insync.replicas"). + setValue("2")).iterator())). + setReplicationFactor(replicationFactor)).iterator())); + createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); + assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("bar").errorCode()); + Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId(); + + // Fence some of the brokers + boolean fencingComplete; + Long waitIterations = 0L; + do { + fencingComplete = true; + sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); + for (int i = brokersToKeepUnfenced; i < brokerCount; i++) { + if (active.replicationControl().isBrokerUnfenced(i)) { + fencingComplete = false; + } + } + Thread.sleep(1000); + waitIterations++; + + if (waitIterations >= sessionTimeout * 3) { + assertTrue(false, "Fencing of brokers did not process within expected time"); Review comment: Thanks (new to the project and JAVA :) ) ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -208,6 +212,24 @@ void unfenceBrokers(Integer... brokerIds) throws Exception { } } + void fenceBrokers(Set<Integer> brokerIds) throws Exception { + time.sleep(BROKER_SESSION_TIMEOUT_MS); Review comment: Good call.. Although I would not do it in the common utility function. Will do it in the test method where this should hold. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org