rpuch commented on code in PR #6408:
URL: https://github.com/apache/ignite-3/pull/6408#discussion_r2284776705


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());

Review Comment:
   Let's import `Collectors.toList` statically so that it reads 
`collect(toList())`, close to plain English



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());
+            }
+        }
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+
+        assertLearnerSize(2);
+    }
+
+    private void assertLearnerSize(int size) throws InterruptedException {
+        assertTrue(waitForCondition(() ->
+                        cluster.stream()
+                                .filter(node -> {
+                                    try {
+                                        return 
node.clusterManager().isCmgLeader().get();

Review Comment:
   Why do we need to find the leader explicitly to get learners from CMG? Can't 
we get them from any node?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);

Review Comment:
   ```suggestion
           MockNode penult = cluster.remove(cluster.size() - 1);
   ```



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -132,6 +132,12 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
     /** Lock for the {@code raftService} field. */
     private final Object raftServiceLock = new Object();
 
+    /** Current updateLearners operation to ensure linearization of updates. */
+    private volatile CompletableFuture<Void> currentUpdateLearners = 
nullCompletedFuture();

Review Comment:
   `currentUpdateLearners` is always accessed under the corresponding lock, so 
`volatile` is excessive



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..

Review Comment:
   What kind of a clash is this?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());

Review Comment:
   ```suggestion
           List<String> votingNodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
   ```



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());
+            }
+        }
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+
+        assertLearnerSize(2);
+    }
+
+    private void assertLearnerSize(int size) throws InterruptedException {
+        assertTrue(waitForCondition(() ->
+                        cluster.stream()
+                                .filter(node -> {
+                                    try {
+                                        return 
node.clusterManager().isCmgLeader().get();
+                                    } catch (InterruptedException | 
ExecutionException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                                .mapToInt(node -> {
+                                    try {
+                                        return 
node.clusterManager().learnerNodes().get().size();

Review Comment:
   ```suggestion
                                           return 
node.clusterManager().learnerNodes().get(10, SECONDS).size();
   ```



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());
+            }
+        }
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+
+        assertLearnerSize(2);
+    }
+
+    private void assertLearnerSize(int size) throws InterruptedException {
+        assertTrue(waitForCondition(() ->
+                        cluster.stream()
+                                .filter(node -> {
+                                    try {
+                                        return 
node.clusterManager().isCmgLeader().get();

Review Comment:
   ```suggestion
                                           return 
node.clusterManager().isCmgLeader().get(10, SECONDS);
   ```



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..

Review Comment:
   ```suggestion
           // Start node 4 first to avoid clashing with the earlier blocked 
message.
   ```



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());
+            }
+        }
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+
+        assertLearnerSize(2);
+    }
+
+    private void assertLearnerSize(int size) throws InterruptedException {
+        assertTrue(waitForCondition(() ->
+                        cluster.stream()
+                                .filter(node -> {
+                                    try {
+                                        return 
node.clusterManager().isCmgLeader().get();
+                                    } catch (InterruptedException | 
ExecutionException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                                .mapToInt(node -> {
+                                    try {
+                                        return 
node.clusterManager().learnerNodes().get().size();
+                                    } catch (InterruptedException | 
ExecutionException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                                .min().orElseThrow() == size,
+                30_000
+        ));
+    }
+
+    private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
+        cluster.stream().map(node -> 
node.clusterService().messagingService()).forEach(msg -> {

Review Comment:
   ```suggestion
           cluster.stream().map(node -> 
node.clusterService().messagingService()).forEach(messagingService -> {
   ```



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);

Review Comment:
   Should we give the test some time to remove a learner (if the production 
code works incorrectly and DOES remove a learner)? Otherwise, we might just 
miss the removal because we check the number of learners immediately after 
stopping the two nodes.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -950,23 +956,49 @@ private static RaftNodeId raftNodeId(Peer serverPeer) {
         return new RaftNodeId(CmgGroupId.INSTANCE, serverPeer);
     }
 
+    /**
+     * Serializes calls to updateLearners to prevent race conditions between 
multiple topology changes
+     * and leader election callbacks.
+     *
+     * @param service CMG Raft service.
+     * @param term RAFT term.
+     * @param checkLeadership Whether to check leadership before updating 
learners.
+     */
+    private CompletableFuture<Void> updateLearnersSerially(CmgRaftService 
service, long term, boolean checkLeadership) {
+        synchronized (updateLearnersLock) {
+            currentUpdateLearners = currentUpdateLearners
+                    .thenCompose(v -> {
+                        if (checkLeadership) {
+                            return 
service.isCurrentNodeLeader().thenCompose(isLeader -> {
+                                if (!isLeader) {
+                                    return nullCompletedFuture();
+                                }
+                                return service.updateLearners(term);
+                            });
+                        } else {
+                            return service.updateLearners(term);
+                        }
+                    })
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            LOG.error("Failed to reset learners", throwable);

Review Comment:
   If we have an exception here, all subsequent attempts to update learners 
will fail with the same exception. Hence, a few points:
   
   1. If we care much about learners set consistency, we should probably call 
the FailureProcessor here as the learners will not be consistent if one update 
is failed
   2. If we don't care about learners set consistency (and can just miss an 
update), we should translate the exception (for example, with 
`exceptionally()`) to make it possible for subsequent updates to be applied
   3. If we don't translate it and leave the chain forever broken, we should 
think about only logging the error message + exception just for the first time, 
noting in the log message that no more learner updates will be applied (like we 
did in WatchProcessor). The subsequent attempts (which will also fail) should 
either not log this at all, or just log a brief message, without the exception



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());

Review Comment:
   ```suggestion
                   logger().info("Learner nodes {}", 
node.clusterManager().learnerNodes().get(10, SECONDS));
   ```



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -281,6 +281,21 @@ private ClusterNodeMessage nodeMessage(ClusterNode node) {
                 .build();
     }
 
+    /**
+     * Returns a known set of consistent IDs of the learners nodes of the CMG.
+     */
+    public CompletableFuture<Set<String>> learners() {
+        List<Peer> currentLearners = raftService.learners();
+
+        if (currentLearners == null) {
+            return raftService.refreshMembers(true).thenCompose(v -> 
learners());
+        }
+
+        return completedFuture(currentLearners.stream()
+                .map(Peer::consistentId)
+                .collect(toCollection(HashSet::new)));

Review Comment:
   Why is it collected to a `HashMap`?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();
+            if (leader) {
+                logger().info("lerner nodes {}", 
node.clusterManager().learnerNodes().get());
+            }
+        }
+
+        // Unblock the first reconfiguration.
+        logger().info("Unblock message.");
+        blockMessage.set(false);
+

Review Comment:
   Again, no time is given to the async message exchanges to be done. Also, 
this test will pass even if learners cannot be changed at all, right?
   
   It seems to be more reliable to collect all configuration changes and then 
analyze them.



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -290,6 +296,126 @@ void testInitInvalidNodesAsync() throws Exception {
         );
     }
 
+    @Test
+    void testNoConfigurationReordering() throws Exception {
+        startCluster(5);
+
+        ClusterManagementGroupManager clusterManager = 
cluster.get(0).clusterManager();
+
+        List<String> nodes = 
cluster.stream().map(MockNode::name).limit(3).collect(Collectors.toList());
+
+        // successful init
+        assertThat(
+                clusterManager.initClusterAsync(nodes, List.of(), "cluster"),
+                willCompleteSuccessfully()
+        );
+
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        // Wait for the initial cluster reconfiguration to complete.
+        assertLearnerSize(2);
+
+        String node3Name = cluster.get(3).name();
+
+        AtomicBoolean blockMessage = new AtomicBoolean(true);
+
+        // Block the first reconfiguration to simulate network issues.
+        // We stop node 4, that should produce a ResetLearnersRequest with 
only one learner - node 3.
+        blockMessage((recipientName, networkMessage) -> {
+            if (!blockMessage.get()) {
+                return false;
+            }
+
+            if (networkMessage instanceof ResetLearnersRequest) {
+                ResetLearnersRequest rlr = (ResetLearnersRequest) 
networkMessage;
+
+                if (rlr.learnersList().contains(node3Name) &&  
rlr.learnersList().size() == 1) {
+                    logger().info("Block message {} to {}", networkMessage, 
recipientName);
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        logger().info("Stop last node [4].");
+        MockNode last = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last));
+
+        logger().info("Stop last node [3].");
+        MockNode last2 = cluster.remove(cluster.size() - 1);
+        stopNodes(List.of(last2));
+
+        // There should be still two learner nodes since the previous 
reconfiguration was blocked.
+        assertLearnerSize(2);
+
+        // Start nodes 3 and 4 back, so that the topology is back to normal 
and no node availability issues are expected.
+        logger().info("Start nodes [3] and [4].");
+        // Start node 4 first to avoid clashing with the earlier blocked 
message..
+        startNode(4, 5);
+        startNode(3, 5);
+
+        logger().info("Nodes started.");
+
+        // Waif for the nodes 3 and 4 to start.
+        for (MockNode node : cluster) {
+            assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
+        }
+
+        assertLearnerSize(2);
+
+        for (MockNode node : cluster) {
+            Boolean leader = node.clusterManager().isCmgLeader().get();

Review Comment:
   ```suggestion
               Boolean leader = node.clusterManager().isCmgLeader().get(10, 
SECONDS);
   ```



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequestProcessor.java:
##########
@@ -52,21 +53,16 @@ public void handleRequest(final RpcContext rpcCtx, final T 
request) {
             if (msg != null) {
                 rpcCtx.sendResponse(msg);
             }
+        } catch (NotLeaderException t) {

Review Comment:
   Is this fix required for this CMG learner updates linearization? If not, 
would it make sense to split it out to another PR? It, by itself, looks like an 
important fix, and it will be weird to see it squashed into an unrelated change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to