kgusakov commented on code in PR #4803: URL: https://github.com/apache/ignite-3/pull/4803#discussion_r1873232695
########## modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java: ########## @@ -202,7 +244,170 @@ private void checkRecoveryRequestOnlyOne(IgniteImpl node) { ); } + private void waitAndAssertStableAssignmentsOfPartitionEqualTo( + IgniteImpl gatewayNode, String tableName, Set<Integer> partitionIds, Set<String> nodes) { + partitionIds.forEach(p -> { + try { + waitAndAssertStableAssignmentsOfPartitionEqualTo(gatewayNode, tableName, p, nodes); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + } + + private void waitAndAssertStableAssignmentsOfPartitionEqualTo(IgniteImpl gatewayNode, String tableName, int partNum, Set<String> nodes) + throws InterruptedException { + + assertTrue(waitForCondition(() -> + nodes.equals( + getPartitionClusterNodes(gatewayNode, tableName, partNum) + .stream() + .map(Assignment::consistentId) + .collect(Collectors.toUnmodifiableSet()) + ), + 500, + 30_000 + ), "Expected set of nodes: " + nodes + " actual: " + getPartitionClusterNodes(gatewayNode, tableName, partNum) + .stream() + .map(Assignment::consistentId) + .collect(Collectors.toUnmodifiableSet())); + } + private static Entry getRecoveryTriggerKey(IgniteImpl node) { return node.metaStorageManager().getLocally(RECOVERY_TRIGGER_KEY, Long.MAX_VALUE); } + + private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String tableName, int partNum) { + return Optional.ofNullable(getTableId(node.catalogManager(), tableName, clock.nowLong())) + .map(tableId -> partitionAssignments(node.metaStorageManager(), tableId, partNum).join()) + .orElse(Set.of()); + } + + private int zoneIdByName(CatalogService catalogService, String zoneName) { + return catalogService.zone(zoneName, clock.nowLong()).id(); + } + + private void createHaZoneWithTable() throws InterruptedException { + executeSql(String.format( + "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', CONSISTENCY_MODE='HIGH_AVAILABILITY'", + HA_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, DEFAULT_STORAGE_PROFILE + )); + + executeSql(String.format( + "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s", + HA_TABLE_NAME, HA_ZONE_NAME + )); + + Set<String> allNodes = runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet()); + + awaitForAllNodesTableGroupInitialization(); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), HA_TABLE_NAME, Set.of(0, 1), allNodes); + } + + private void createScZoneWithTable() { + executeSql(String.format( + "CREATE ZONE %s WITH REPLICAS=%s, PARTITIONS=%s, STORAGE_PROFILES='%s', CONSISTENCY_MODE='STRONG_CONSISTENCY'", + SC_ZONE_NAME, initialNodes(), PARTITIONS_NUMBER, DEFAULT_STORAGE_PROFILE + )); + + executeSql(String.format( + "CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE %s", + SC_TABLE_NAME, SC_ZONE_NAME + )); + + Set<String> allNodes = runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet()); + + waitAndAssertStableAssignmentsOfPartitionEqualTo(unwrapIgniteImpl(node(0)), SC_TABLE_NAME, Set.of(0, 1), allNodes); + } + + private void assertRecoveryKeyIsEmpty(IgniteImpl gatewayNode) { + assertTrue(getRecoveryTriggerKey(gatewayNode).empty()); + } + + private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode) throws InterruptedException { + waitAndAssertRecoveryKeyIsNotEmpty(gatewayNode, 5_000); + } + + private void waitAndAssertRecoveryKeyIsNotEmpty(IgniteImpl gatewayNode, long timeoutMillis) throws InterruptedException { + assertTrue(waitForCondition(() -> !getRecoveryTriggerKey(gatewayNode).empty(), timeoutMillis)); + } + + private void stopNodes(Integer... nodes) { + Arrays.stream(nodes).forEach(this::stopNode); + } + + private void changePartitionDistributionTimeout(IgniteImpl gatewayNode, int timeout) { + CompletableFuture<Void> changeFuture = gatewayNode + .clusterConfiguration() + .getConfiguration(SystemDistributedExtensionConfiguration.KEY) + .system().change(c0 -> c0.changeProperties() + .createOrUpdate(PARTITION_DISTRIBUTION_RESET_TIMEOUT, + c1 -> c1.changePropertyValue(String.valueOf(timeout))) + ); + + assertThat(changeFuture, willCompleteSuccessfully()); + } + + private long waitForSpecificZoneTopologyUpdateAndReturnUpdateRevision( + IgniteImpl gatewayNode, String zoneName, Set<String> targetTopologyUpdate + ) throws InterruptedException { + int zoneId = zoneIdByName(gatewayNode.catalogManager(), zoneName); + + AtomicLong revision = new AtomicLong(); + + assertTrue(waitForCondition(() -> { + var state = gatewayNode + .distributionZoneManager() + .zonesState() + .get(zoneId); + + if (state != null) { + var lastEntry = state.topologyAugmentationMap().lastEntry(); + + var isTheSameAsTarget = lastEntry.getValue().nodes() + .stream() + .map(Node::nodeName) + .collect(Collectors.toUnmodifiableSet()) + .equals(targetTopologyUpdate); + + if (isTheSameAsTarget) { + revision.set(lastEntry.getKey()); + } + + return isTheSameAsTarget; + } + return false; + }, 10_000)); + + assert revision.get() != 0; + + return revision.get(); + } + + private void awaitForAllNodesTableGroupInitialization() throws InterruptedException { + assertTrue(waitForCondition(() -> { + AtomicLong numberOfInitializedReplicas = new AtomicLong(0); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org