kgusakov commented on code in PR #4803:
URL: https://github.com/apache/ignite-3/pull/4803#discussion_r1873232695


##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java:
##########
@@ -202,7 +244,170 @@ private void checkRecoveryRequestOnlyOne(IgniteImpl node) 
{
         );
     }
 
+    private void waitAndAssertStableAssignmentsOfPartitionEqualTo(
+            IgniteImpl gatewayNode, String tableName, Set<Integer> 
partitionIds, Set<String> nodes) {
+        partitionIds.forEach(p -> {
+            try {
+                waitAndAssertStableAssignmentsOfPartitionEqualTo(gatewayNode, 
tableName, p, nodes);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+    }
+
+    private void waitAndAssertStableAssignmentsOfPartitionEqualTo(IgniteImpl 
gatewayNode, String tableName, int partNum, Set<String> nodes)
+            throws InterruptedException {
+
+        assertTrue(waitForCondition(() ->
+                nodes.equals(
+                        getPartitionClusterNodes(gatewayNode, tableName, 
partNum)
+                                .stream()
+                                .map(Assignment::consistentId)
+                                .collect(Collectors.toUnmodifiableSet())
+                ),
+                500,
+                30_000
+        ), "Expected set of nodes: " + nodes + " actual: " + 
getPartitionClusterNodes(gatewayNode, tableName, partNum)
+                .stream()
+                .map(Assignment::consistentId)
+                .collect(Collectors.toUnmodifiableSet()));
+    }
+
     private static Entry getRecoveryTriggerKey(IgniteImpl node) {
         return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, 
Long.MAX_VALUE);
     }
+
+    private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String 
tableName, int partNum) {
+        return Optional.ofNullable(getTableId(node.catalogManager(), 
tableName, clock.nowLong()))
+                .map(tableId -> 
partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
+                .orElse(Set.of());
+    }
+
+    private int zoneIdByName(CatalogService catalogService, String zoneName) {
+        return catalogService.zone(zoneName, clock.nowLong()).id();
+    }
+
+    private void createHaZoneWithTable() throws InterruptedException {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, 
STORAGE_PROFILES='%s', CONSISTENCY_MODE='HIGH_AVAILABILITY'",
+                HA_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, 
DEFAULT_STORAGE_PROFILE
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s",
+                HA_TABLE_NAME, HA_ZONE_NAME
+        ));
+
+        Set<String> allNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        awaitForAllNodesTableGroupInitialization();
+
+        
waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), 
HA_TABLE_NAME, Set.of(0, 1), allNodes);
+    }
+
+    private void createScZoneWithTable() {
+        executeSql(String.format(
+                "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, 
STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'",
+                SC_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, 
DEFAULT_STORAGE_PROFILE
+        ));
+
+        executeSql(String.format(
+                "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s",
+                SC_TABLE_NAME, SC_ZONE_NAME
+        ));
+
+        Set<String> allNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        
waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), 
SC_TABLE_NAME, Set.of(0, 1), allNodes);
+    }
+
+    private void assertRecoveryKeyIsEmpty(IgniteImpl gatewayNode) {
+        assertTrue(getRecoveryTriggerKey(gatewayNode).empty());
+    }
+
+    private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode) 
throws InterruptedException {
+        waitAndAssertRecoveryKeyIsNotEmpty(gatewayNode, 5_000);
+    }
+
+    private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode, 
long timeoutMillis) throws InterruptedException {
+        assertTrue(waitForCondition(() -> 
!getRecoveryTriggerKey(gatewayNode).empty(), timeoutMillis));
+    }
+
+    private void stopNodes(Integer... nodes) {
+        Arrays.stream(nodes).forEach(this::stopNode);
+    }
+
+    private void changePartitionDistributionTimeout(IgniteImpl gatewayNode, 
int timeout) {
+        CompletableFuture<Void> changeFuture = gatewayNode
+                .clusterConfiguration()
+                .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+                .system().change(c0 -> c0.changeProperties()
+                        .createOrUpdate(PARTITION_DISTRIBUTION_RESET_TIMEOUT,
+                                c1 -> 
c1.changePropertyValue(String.valueOf(timeout)))
+                );
+
+        assertThat(changeFuture, willCompleteSuccessfully());
+    }
+
+    private long waitForSpecificZoneTopologyUpdateAndReturnUpdateRevision(
+            IgniteImpl gatewayNode, String zoneName, Set<String> 
targetTopologyUpdate
+    ) throws InterruptedException {
+        int zoneId = zoneIdByName(gatewayNode.catalogManager(), zoneName);
+
+        AtomicLong revision = new AtomicLong();
+
+        assertTrue(waitForCondition(() -> {
+            var state = gatewayNode
+                    .distributionZoneManager()
+                    .zonesState()
+                    .get(zoneId);
+
+            if (state != null) {
+                var lastEntry = state.topologyAugmentationMap().lastEntry();
+
+                var isTheSameAsTarget = lastEntry.getValue().nodes()
+                        .stream()
+                        .map(Node::nodeName)
+                        .collect(Collectors.toUnmodifiableSet())
+                        .equals(targetTopologyUpdate);
+
+                if (isTheSameAsTarget) {
+                    revision.set(lastEntry.getKey());
+                }
+
+                return isTheSameAsTarget;
+            }
+            return false;
+        }, 10_000));
+
+        assert revision.get() != 0;
+
+        return revision.get();
+    }
+
+    private void awaitForAllNodesTableGroupInitialization() throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> {
+            AtomicLong numberOfInitializedReplicas = new AtomicLong(0);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to