hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687186647



##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),

Review comment:
       This part is a little mysterious to me. We return an error, but the 
topic is created anyway? That seems surprising.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());

Review comment:
       nit: it's a small thing, but the assertion failure message is more 
useful if we use the `Errors` type.
   ```java
   assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
   ```

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;

Review comment:
       nit: maybe we can use milliseconds consistently and leave out some unit 
conversions below.
   
   Also, could this and `sleepMillis` be a `long` instead of `Long`?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).

Review comment:
       Perhaps we can use `Uuid.randomUuid`? It's a little weird for all 
brokers to have the same incarnationId.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);
+
+                // All brokers should still be unfenced
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                }
+                createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                                setConfigs(new 
CreateableTopicConfigCollection(Collections.
+                                    singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").

Review comment:
       Is there a point to setting `min.insync.replicas` in this test? I am 
wondering why we don't just reuse the original create request.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;

Review comment:
       Maybe a little subjective, but I think the test case would be more 
readable if we list these brokers directly. For example:
   ```java
   List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
   List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
   ```

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =

Review comment:
       nit: we can combine these two `try` blocks:
   ```java
   try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty());
        QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), 
Optional.of(sessionTimeoutSec))) {
   ...
   ```

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
##########
@@ -36,16 +41,29 @@
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
 
-    public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
-                                   Consumer<QuorumController.Builder> 
builderConsumer)
-                                   throws Exception {
+    public QuorumControllerTestEnv(
+        LocalLogManagerTestEnv logEnv,
+        Consumer<QuorumController.Builder> builderConsumer
+    ) throws Exception {
+        this(logEnv, builderConsumer, Optional.empty());
+    }
+
+    public QuorumControllerTestEnv(
+        LocalLogManagerTestEnv logEnv,
+        Consumer<Builder> builderConsumer,
+        Optional<Long> sessionTimeoutSeconds

Review comment:
       nit: as above, can we use milliseconds? It's the most conventional unit 
we use in the code for configurations representing durations.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        int brokerCount = 5;
+        int brokersToKeepUnfenced = 1;
+        short replicationFactor = 5;
+        Long sessionTimeoutSec = 1L;
+        Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv =
+                new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                Map<Integer, Long> brokerEpochs = new HashMap<>();
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                        new BrokerRegistrationRequestData().
+                            setBrokerId(brokerId).
+                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                            setListeners(listeners));
+                    brokerEpochs.put(brokerId, reply.get().epoch());
+                }
+
+                // Brokers are only registered but still fenced
+                // Topic creation with no available unfenced brokers should 
fail
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+                    createTopicsResponseData.topics().find("foo").errorCode());
+                assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+                    "are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+                // Unfence all brokers
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                createTopicsResponseData = active.createTopics(
+                    createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+                Thread.sleep(sleepMillis);
+
+                // All brokers should still be unfenced
+                for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+                    
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                }
+                createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                                setConfigs(new 
CreateableTopicConfigCollection(Collections.
+                                    singleton(new 
CreateableTopicConfig().setName("min.insync.replicas").
+                                        setValue("2")).iterator())).
+                                
setReplicationFactor(replicationFactor)).iterator()));
+                createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+                assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("bar").errorCode());
+                Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+
+                // Fence some of the brokers
+                TestUtils.waitForCondition(() -> {
+                        sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                        for (int i = brokersToKeepUnfenced; i < brokerCount; 
i++) {
+                            if 
(active.replicationControl().isBrokerUnfenced(i)) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }, sessionTimeoutSec * 2 * 1000,
+                    "Fencing of brokers did not process within expected time"
+                );
+
+                // At this point the brokers we want fenced are fenced.
+                // Send another heartbeat to the brokers we want to keep alive
+                sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                int[] expectedIsr = new int[brokersToKeepUnfenced];

Review comment:
       If we create the topic with an explicit assignment, then I think we can 
remove this logic to build the expected ISR since we would be able to assert 
the 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to