alievmirza commented on code in PR #4803:
URL: https://github.com/apache/ignite-3/pull/4803#discussion_r1869385941


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -248,29 +252,51 @@ public List<SystemView<?>> systemViews() {
         );
     }
 
+    @TestOnly
+    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
+        return ongoingOperationsById;
+    }
+
     private CompletableFuture<Boolean> 
onHaZoneTopologyReduce(HaZoneTopologyUpdateEventParams params) {
         int zoneId = params.zoneId();
         long timestamp = params.timestamp();
 
         CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneId, 
timestamp);
         int catalogVersion = catalogManager.activeCatalogVersion(timestamp);
+        long revision = 
metaStorageManager.revisionByTimestampLocally(HybridTimestamp.hybridTimestamp(timestamp));

Review Comment:
   I see that we work with revisions, convert it to timestamp and set to 
`HaZoneTopologyUpdateEventParams`, and after that we introduce  
`revisionByTimestampLocally` to convert timestamp back to revision. Why cant we 
just pass revision to `HaZoneTopologyUpdateEventParams` ? 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -248,29 +252,51 @@ public List<SystemView<?>> systemViews() {
         );
     }
 
+    @TestOnly
+    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
+        return ongoingOperationsById;
+    }
+
     private CompletableFuture<Boolean> 
onHaZoneTopologyReduce(HaZoneTopologyUpdateEventParams params) {
         int zoneId = params.zoneId();
         long timestamp = params.timestamp();
 
         CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneId, 
timestamp);
         int catalogVersion = catalogManager.activeCatalogVersion(timestamp);
+        long revision = 
metaStorageManager.revisionByTimestampLocally(HybridTimestamp.hybridTimestamp(timestamp));
 
         List<CatalogTableDescriptor> tables = findTablesByZoneId(zoneId, 
catalogVersion, catalogManager);
         List<CompletableFuture<Void>> tablesResetFuts = new ArrayList<>();
         for (CatalogTableDescriptor table : tables) {
             Set<Integer> partitionsToReset = new HashSet<>();
             for (int partId = 0; partId < zoneDescriptor.partitions(); 
partId++) {
-                // TODO: https://issues.apache.org/jira/browse/IGNITE-23599 
implement check for majority loss
-                partitionsToReset.add(partId);
+                TablePartitionId partitionId = new 
TablePartitionId(table.id(), partId);
+
+                if (stableAssignmentsWithOnlyAliveNodes(partitionId, 
revision).size() < zoneDescriptor.replicas() / 2 + 1) {

Review Comment:
   ```suggestion
                   if (stableAssignmentsWithOnlyAliveNodes(partitionId, 
revision).size() < (zoneDescriptor.replicas() / 2 + 1)) {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -248,29 +252,51 @@ public List<SystemView<?>> systemViews() {
         );
     }
 
+    @TestOnly
+    public Map<UUID, CompletableFuture<Void>> ongoingOperationsById() {
+        return ongoingOperationsById;
+    }
+
     private CompletableFuture<Boolean> 
onHaZoneTopologyReduce(HaZoneTopologyUpdateEventParams params) {
         int zoneId = params.zoneId();
         long timestamp = params.timestamp();
 
         CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneId, 
timestamp);
         int catalogVersion = catalogManager.activeCatalogVersion(timestamp);
+        long revision = 
metaStorageManager.revisionByTimestampLocally(HybridTimestamp.hybridTimestamp(timestamp));
 
         List<CatalogTableDescriptor> tables = findTablesByZoneId(zoneId, 
catalogVersion, catalogManager);
         List<CompletableFuture<Void>> tablesResetFuts = new ArrayList<>();
         for (CatalogTableDescriptor table : tables) {
             Set<Integer> partitionsToReset = new HashSet<>();
             for (int partId = 0; partId < zoneDescriptor.partitions(); 
partId++) {
-                // TODO: https://issues.apache.org/jira/browse/IGNITE-23599 
implement check for majority loss
-                partitionsToReset.add(partId);
+                TablePartitionId partitionId = new 
TablePartitionId(table.id(), partId);
+
+                if (stableAssignmentsWithOnlyAliveNodes(partitionId, 
revision).size() < zoneDescriptor.replicas() / 2 + 1) {

Review Comment:
   I would add parentheses to make it more readable 



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java:
##########
@@ -202,7 +244,170 @@ private void checkRecoveryRequestOnlyOne(IgniteImpl node) 
{
         );
     }
 
+    private void waitAndAssertStableAssignmentsOfPartitionEqualTo(
+            IgniteImpl gatewayNode, String tableName, Set<Integer> 
partitionIds, Set<String> nodes) {
+        partitionIds.forEach(p -> {
+            try {
+                waitAndAssertStableAssignmentsOfPartitionEqualTo(gatewayNode, 
tableName, p, nodes);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+    }
+
+    private void waitAndAssertStableAssignmentsOfPartitionEqualTo(IgniteImpl 
gatewayNode, String tableName, int partNum, Set<String> nodes)
+            throws InterruptedException {
+
+        assertTrue(waitForCondition(() ->
+                nodes.equals(
+                        getPartitionClusterNodes(gatewayNode, tableName, 
partNum)
+                                .stream()
+                                .map(Assignment::consistentId)
+                                .collect(Collectors.toUnmodifiableSet())
+                ),
+                500,
+                30_000
+        ), "Expected set of nodes: " + nodes + " actual: " + 
getPartitionClusterNodes(gatewayNode, tableName, partNum)
+                .stream()
+                .map(Assignment::consistentId)
+                .collect(Collectors.toUnmodifiableSet()));
+    }
+
     private static Entry getRecoveryTriggerKey(IgniteImpl node) {
         return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, 
Long.MAX_VALUE);
     }
+
+    private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String 
tableName, int partNum) {
+        return Optional.ofNullable(getTableId(node.catalogManager(), 
tableName, clock.nowLong()))
+                .map(tableId -> 
partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
+                .orElse(Set.of());
+    }
+
+    private int zoneIdByName(CatalogService catalogService, String zoneName) {
+        return catalogService.zone(zoneName, clock.nowLong()).id();
+    }
+
+    private void createHaZoneWithTable() throws InterruptedException {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, 
STORAGE_PROFILES='%s', CONSISTENCY_MODE='HIGH_AVAILABILITY'",
+                HA_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, 
DEFAULT_STORAGE_PROFILE
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s",
+                HA_TABLE_NAME, HA_ZONE_NAME
+        ));
+
+        Set<String> allNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        awaitForAllNodesTableGroupInitialization();
+
+        
waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), 
HA_TABLE_NAME, Set.of(0, 1), allNodes);
+    }
+
+    private void createScZoneWithTable() {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, 
STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'",
+                SC_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, 
DEFAULT_STORAGE_PROFILE
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s",
+                SC_TABLE_NAME, SC_ZONE_NAME
+        ));
+
+        Set<String> allNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        
waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), 
SC_TABLE_NAME, Set.of(0, 1), allNodes);
+    }
+
+    private void assertRecoveryKeyIsEmpty(IgniteImpl gatewayNode) {
+        assertTrue(getRecoveryTriggerKey(gatewayNode).empty());
+    }
+
+    private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode) 
throws InterruptedException {
+        waitAndAssertRecoveryKeyIsNotEmpty(gatewayNode, 5_000);
+    }
+
+    private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode, 
long timeoutMillis) throws InterruptedException {
+        assertTrue(waitForCondition(() -> 
!getRecoveryTriggerKey(gatewayNode).empty(), timeoutMillis));
+    }
+
+    private void stopNodes(Integer... nodes) {
+        Arrays.stream(nodes).forEach(this::stopNode);
+    }
+
+    private void changePartitionDistributionTimeout(IgniteImpl gatewayNode, 
int timeout) {
+        CompletableFuture<Void> changeFuture = gatewayNode
+                .clusterConfiguration()
+                .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+                .system().change(c0 -> c0.changeProperties()
+                        .createOrUpdate(PARTITION_DISTRIBUTION_RESET_TIMEOUT,
+                                c1 -> 
c1.changePropertyValue(String.valueOf(timeout)))
+                );
+
+        assertThat(changeFuture, willCompleteSuccessfully());
+    }
+
+    private long waitForSpecificZoneTopologyUpdateAndReturnUpdateRevision(
+            IgniteImpl gatewayNode, String zoneName, Set<String> 
targetTopologyUpdate
+    ) throws InterruptedException {
+        int zoneId = zoneIdByName(gatewayNode.catalogManager(), zoneName);
+
+        AtomicLong revision = new AtomicLong();
+
+        assertTrue(waitForCondition(() -> {
+            var state = gatewayNode
+                    .distributionZoneManager()
+                    .zonesState()
+                    .get(zoneId);
+
+            if (state != null) {
+                var lastEntry = state.topologyAugmentationMap().lastEntry();
+
+                var isTheSameAsTarget = lastEntry.getValue().nodes()
+                        .stream()
+                        .map(Node::nodeName)
+                        .collect(Collectors.toUnmodifiableSet())
+                        .equals(targetTopologyUpdate);
+
+                if (isTheSameAsTarget) {
+                    revision.set(lastEntry.getKey());
+                }
+
+                return isTheSameAsTarget;
+            }
+            return false;
+        }, 10_000));
+
+        assert revision.get() != 0;
+
+        return revision.get();
+    }
+
+    private void awaitForAllNodesTableGroupInitialization() throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            AtomicLong numberOfInitializedReplicas = new AtomicLong(0);

Review Comment:
   I would make it `AtomicInteger`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to