niket-goel commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r686343189



##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,112 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Throwable {

Review comment:
       The timeout was because I accidentally deleted a line before pushing the 
commit. Will fix that. 
   Will discuss pulling the test out offline with you.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -858,7 +858,7 @@ private void rescheduleMaybeFenceStaleBrokers() {
             return;
         }
         scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () 
-> {
-            ControllerResult<Void> result = 
replicationControl.maybeFenceStaleBrokers();
+            ControllerResult<Void> result = 
replicationControl.maybeFenceOneStaleBroker();
             rescheduleMaybeFenceStaleBrokers();

Review comment:
       added tests in the next revision.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
##########
@@ -37,7 +42,13 @@
     private final LocalLogManagerTestEnv logEnv;
 
     public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
-                                   Consumer<QuorumController.Builder> 
builderConsumer)
+        Consumer<QuorumController.Builder> builderConsumer)

Review comment:
       ack. Will match the indentation to what it was before.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,18 +925,27 @@ ApiError electLeader(String topic, int partitionId, 
boolean uncleanOk,
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
-        List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        Optional<Integer> staleBroker = heartbeatManager.findOneStaleBroker();
+        if (staleBroker.isPresent()) {

Review comment:
       Will do.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
##########
@@ -93,22 +93,26 @@ public void testFindStaleBrokers() {
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
 
         time.sleep(5);
-        assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
+        assertEquals(Optional.of(0), manager.findOneStaleBroker());
         manager.fence(0);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
         iter = manager.unfenced().iterator();
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
 
         time.sleep(20);
-        assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
-        manager.fence(1);
-        manager.fence(2);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        Integer nodeId = 1;
+        while (manager.findOneStaleBroker().isPresent()) {

Review comment:
       ack. Good catch! I should have added an assert on the node count.  
   Will just unroll the loop .

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##########
@@ -419,6 +419,27 @@ long nextCheckTimeNs() {
         }
     }
 
+    /**
+     * Check if the oldest broker to have hearbeated has already violated the
+     * sessionTimeoutNs timeout and needs to be fenced.
+     *
+     * @return      An Optional broker node id.
+     */
+    Optional<Integer> findOneStaleBroker() {
+        Optional<Integer> node = Optional.empty();
+        BrokerHeartbeatStateIterator iterator = unfenced.iterator();
+        if (iterator.hasNext()) {
+            BrokerHeartbeatState broker = iterator.next();
+            // The unfenced broker list is sorted on last contact time from 
each
+            // broker. If the first broker has a valid session then all do
+            if (!hasValidSession(broker)) {
+                node = Optional.of(broker.id);

Review comment:
       will do.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1029,6 +1051,22 @@ public void 
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
             ctx.replicationControl.getPartition(fooId, 1));
     }
 
+    @Test
+    public void testFenceMultipleBrokers() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
+        assertArrayEquals(new int[]{1}, partition0.isr);

Review comment:
       Makes sense. Will add the leader assertion and see what other things we 
can assert here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to