ibessonov commented on code in PR #4403:
URL: https://github.com/apache/ignite-3/pull/4403#discussion_r1763209130


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -264,11 +264,8 @@ public ClusterManagementGroupManager(
      * @param cmgNodeNames Names of nodes that will host the Cluster 
Management Group.
      * @param clusterName Human-readable name of the cluster.
      */
-    public void initCluster(
-            Collection<String> metaStorageNodeNames,
-            Collection<String> cmgNodeNames,
-            String clusterName
-    ) throws NodeStoppingException {
+    public void initCluster(Collection<String> metaStorageNodeNames, 
Collection<String> cmgNodeNames, String clusterName)

Review Comment:
   This formatting looked fine, what was the problem?



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);
+
+                    Set<String> newMgNodes = nodesWithBestIndexes(indexes, 
metastorageReplicationFactor);
+                    LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+                    String bestNodeName = chooseNodeWithBestIndex(indexes, 
newMgNodes);
+                    LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+                    return cmgManager.changeMetastorageNodes(newMgNodes)
+                            .thenCompose(unused -> appointLeader(bestNodeName, 
indexes.get(bestNodeName).term(), newMgNodes))
+                            .thenRun(() -> LOG.info("Appointed MG leader 
forcefully [leader={}].", bestNodeName));
+                });
+    }
+
+    private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String> 
nodeNames) {
+        Set<String> cumulativeValidatedNodeNames = 
ConcurrentHashMap.newKeySet();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        LogicalTopologyEventListener listener = new 
LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(LogicalNode validatedNode) {
+                cumulativeValidatedNodeNames.add(validatedNode.name());
+
+                if (contains(cumulativeValidatedNodeNames, nodeNames)) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onNodeInvalidated(LogicalNode invalidatedNode) {
+                cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+            }
+        };
+
+        logicalTopology.addEventListener(listener);
+
+        cmgManager.validatedNodes()
+                .thenAccept(validatedNodes -> {
+                    Set<String> validatedNodeNames = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(toSet());
+                    if (contains(validatedNodeNames, nodeNames)) {
+                        future.complete(null);
+                    }
+
+                    cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+                });
+
+        return future
+                .thenRun(() -> logicalTopology.removeEventListener(listener));
+    }
+
+    private static boolean contains(Set<String> container, Set<String> 
containee) {
+        return difference(containee, container).isEmpty();
+    }
+
+    private CompletableFuture<Map<String, IndexWithTerm>> 
collectMetastorageIndexes(Set<String> participatingNodeNames) {
+        MetastorageIndexTermRequestMessage request = 
messagesFactory.metastorageIndexTermRequestMessage().build();
+
+        Map<String, CompletableFuture<MetastorageIndexTermResponseMessage>> 
responses = new HashMap<>();
+
+        for (String nodeName : participatingNodeNames) {
+            responses.put(
+                    nodeName,
+                    messagingService.invoke(nodeName, request, 
10_000).thenApply(MetastorageIndexTermResponseMessage.class::cast)
+            );
+        }
+
+        return allOf(responses.values()).thenApply(unused -> {
+            return responses.entrySet().stream()
+                    .collect(toMap(Entry::getKey, entry -> 
indexWithTerm(responses.get(entry.getKey()).join())));

Review Comment:
   `responses.get(entry.getKey())` is the same as `entry.getValue()`, is it?



##########
modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class MetastorageRepairImplTest extends BaseIgniteAbstractTest {

Review Comment:
   It is too early for integration tests, right?



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))

Review Comment:
   What if it never happens, why do we assume that operation completes?



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);
+
+                    Set<String> newMgNodes = nodesWithBestIndexes(indexes, 
metastorageReplicationFactor);
+                    LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+                    String bestNodeName = chooseNodeWithBestIndex(indexes, 
newMgNodes);
+                    LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+                    return cmgManager.changeMetastorageNodes(newMgNodes)
+                            .thenCompose(unused -> appointLeader(bestNodeName, 
indexes.get(bestNodeName).term(), newMgNodes))
+                            .thenRun(() -> LOG.info("Appointed MG leader 
forcefully [leader={}].", bestNodeName));
+                });
+    }
+
+    private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String> 
nodeNames) {
+        Set<String> cumulativeValidatedNodeNames = 
ConcurrentHashMap.newKeySet();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        LogicalTopologyEventListener listener = new 
LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(LogicalNode validatedNode) {
+                cumulativeValidatedNodeNames.add(validatedNode.name());
+
+                if (contains(cumulativeValidatedNodeNames, nodeNames)) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onNodeInvalidated(LogicalNode invalidatedNode) {
+                cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+            }
+        };
+
+        logicalTopology.addEventListener(listener);
+
+        cmgManager.validatedNodes()
+                .thenAccept(validatedNodes -> {
+                    Set<String> validatedNodeNames = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(toSet());
+                    if (contains(validatedNodeNames, nodeNames)) {
+                        future.complete(null);
+                    }
+
+                    cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+                });
+
+        return future
+                .thenRun(() -> logicalTopology.removeEventListener(listener));
+    }
+
+    private static boolean contains(Set<String> container, Set<String> 
containee) {

Review Comment:
   Let's rename to `containsAll`, `contains` usually means `∈`, not `⊂`



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);
+
+                    Set<String> newMgNodes = nodesWithBestIndexes(indexes, 
metastorageReplicationFactor);
+                    LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+                    String bestNodeName = chooseNodeWithBestIndex(indexes, 
newMgNodes);
+                    LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+                    return cmgManager.changeMetastorageNodes(newMgNodes)
+                            .thenCompose(unused -> appointLeader(bestNodeName, 
indexes.get(bestNodeName).term(), newMgNodes))
+                            .thenRun(() -> LOG.info("Appointed MG leader 
forcefully [leader={}].", bestNodeName));
+                });
+    }
+
+    private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String> 
nodeNames) {
+        Set<String> cumulativeValidatedNodeNames = 
ConcurrentHashMap.newKeySet();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        LogicalTopologyEventListener listener = new 
LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(LogicalNode validatedNode) {
+                cumulativeValidatedNodeNames.add(validatedNode.name());
+
+                if (contains(cumulativeValidatedNodeNames, nodeNames)) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onNodeInvalidated(LogicalNode invalidatedNode) {
+                cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+            }
+        };
+
+        logicalTopology.addEventListener(listener);
+
+        cmgManager.validatedNodes()
+                .thenAccept(validatedNodes -> {
+                    Set<String> validatedNodeNames = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(toSet());
+                    if (contains(validatedNodeNames, nodeNames)) {
+                        future.complete(null);
+                    }
+
+                    cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+                });
+
+        return future
+                .thenRun(() -> logicalTopology.removeEventListener(listener));
+    }
+
+    private static boolean contains(Set<String> container, Set<String> 
containee) {
+        return difference(containee, container).isEmpty();
+    }
+
+    private CompletableFuture<Map<String, IndexWithTerm>> 
collectMetastorageIndexes(Set<String> participatingNodeNames) {
+        MetastorageIndexTermRequestMessage request = 
messagesFactory.metastorageIndexTermRequestMessage().build();
+
+        Map<String, CompletableFuture<MetastorageIndexTermResponseMessage>> 
responses = new HashMap<>();
+
+        for (String nodeName : participatingNodeNames) {
+            responses.put(
+                    nodeName,
+                    messagingService.invoke(nodeName, request, 
10_000).thenApply(MetastorageIndexTermResponseMessage.class::cast)
+            );
+        }
+
+        return allOf(responses.values()).thenApply(unused -> {
+            return responses.entrySet().stream()
+                    .collect(toMap(Entry::getKey, entry -> 
indexWithTerm(responses.get(entry.getKey()).join())));
+        });
+    }
+
+    private static IndexWithTerm 
indexWithTerm(MetastorageIndexTermResponseMessage message) {
+        return new IndexWithTerm(message.raftIndex(), message.raftTerm());
+    }
+
+    private static Set<String> nodesWithBestIndexes(Map<String, IndexWithTerm> 
indexes, int metastorageReplicationFactor) {
+        return indexes.entrySet().stream()
+                .sorted(Entry.<String, 
IndexWithTerm>comparingByValue().reversed())
+                .limit(metastorageReplicationFactor)
+                .map(Entry::getKey)
+                .collect(toSet());

Review Comment:
   Maybe we want to use a sorted or a linked set here? Do we just ignore the 
sorting after we're done with this method?



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);

Review Comment:
   I think that 4 spaces of padding are missing here



##########
modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);
+
+                    Set<String> newMgNodes = nodesWithBestIndexes(indexes, 
metastorageReplicationFactor);
+                    LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+                    String bestNodeName = chooseNodeWithBestIndex(indexes, 
newMgNodes);
+                    LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+                    return cmgManager.changeMetastorageNodes(newMgNodes)
+                            .thenCompose(unused -> appointLeader(bestNodeName, 
indexes.get(bestNodeName).term(), newMgNodes))
+                            .thenRun(() -> LOG.info("Appointed MG leader 
forcefully [leader={}].", bestNodeName));
+                });
+    }
+
+    private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String> 
nodeNames) {
+        Set<String> cumulativeValidatedNodeNames = 
ConcurrentHashMap.newKeySet();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        LogicalTopologyEventListener listener = new 
LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(LogicalNode validatedNode) {
+                cumulativeValidatedNodeNames.add(validatedNode.name());
+
+                if (contains(cumulativeValidatedNodeNames, nodeNames)) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onNodeInvalidated(LogicalNode invalidatedNode) {
+                cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+            }
+        };
+
+        logicalTopology.addEventListener(listener);
+
+        cmgManager.validatedNodes()
+                .thenAccept(validatedNodes -> {
+                    Set<String> validatedNodeNames = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(toSet());
+                    if (contains(validatedNodeNames, nodeNames)) {
+                        future.complete(null);
+                    }
+
+                    cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+                });
+
+        return future
+                .thenRun(() -> logicalTopology.removeEventListener(listener));
+    }
+
+    private static boolean contains(Set<String> container, Set<String> 
containee) {
+        return difference(containee, container).isEmpty();
+    }
+
+    private CompletableFuture<Map<String, IndexWithTerm>> 
collectMetastorageIndexes(Set<String> participatingNodeNames) {
+        MetastorageIndexTermRequestMessage request = 
messagesFactory.metastorageIndexTermRequestMessage().build();
+
+        Map<String, CompletableFuture<MetastorageIndexTermResponseMessage>> 
responses = new HashMap<>();
+
+        for (String nodeName : participatingNodeNames) {
+            responses.put(
+                    nodeName,
+                    messagingService.invoke(nodeName, request, 
10_000).thenApply(MetastorageIndexTermResponseMessage.class::cast)
+            );
+        }
+
+        return allOf(responses.values()).thenApply(unused -> {
+            return responses.entrySet().stream()
+                    .collect(toMap(Entry::getKey, entry -> 
indexWithTerm(responses.get(entry.getKey()).join())));
+        });
+    }
+
+    private static IndexWithTerm 
indexWithTerm(MetastorageIndexTermResponseMessage message) {
+        return new IndexWithTerm(message.raftIndex(), message.raftTerm());
+    }
+
+    private static Set<String> nodesWithBestIndexes(Map<String, IndexWithTerm> 
indexes, int metastorageReplicationFactor) {
+        return indexes.entrySet().stream()
+                .sorted(Entry.<String, 
IndexWithTerm>comparingByValue().reversed())
+                .limit(metastorageReplicationFactor)
+                .map(Entry::getKey)
+                .collect(toSet());
+    }
+
+    private static String chooseNodeWithBestIndex(Map<String, IndexWithTerm> 
indexes, Set<String> newMgNodes) {
+        return newMgNodes.stream()
+                .sorted(Comparator.<String, 
IndexWithTerm>comparing(indexes::get).reversed())
+                .limit(1)
+                .findFirst().orElseThrow();

Review Comment:
   There's a `Stream#min` and `Stream#max`, depending on the desired comparison 
order. I suggest using one of these methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to