sanpwc commented on code in PR #4966:
URL: https://github.com/apache/ignite-3/pull/4966#discussion_r1898668775


##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -1234,6 +1242,295 @@ void testTwoPhaseResetOnEmptyNodes() throws Exception {
         assertPlannedAssignments(node0, partId, assignments13);
     }
 
+    @Test
+    @ZoneParams(nodes = 7, replicas = 3, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testAssignmentsChainUpdate() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 0, 2, 3);
+
+        Assignments initialAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, initialAssignments);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(initialAssignments));
+
+        // Write data(1) to all nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        logger().info("Stopping nodes [ids={}].", 3);
+
+        stopNode(3);
+
+        logger().info("Stopped nodes [ids={}].", 3);
+
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        ), timestamp);
+
+        assertRealAssignments(node0, partId, 0, 1, 2);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        // Graceful change should reinit the assignments chain, in other words 
there should be only one link
+        // in the chain - the current stable assignments.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments)));
+
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        // Disable automatic rebalance since we want to restore replica factor.
+        setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+
+        // Now stop the majority and the automatic reset should kick in.
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 
2}));
+
+        Assignments resetAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name())
+        ), timestamp);
+
+        AtomicBoolean blockedLink = new AtomicBoolean(true);
+
+        // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
+        blockMessage((nodeName, msg) -> blockedLink.get() && 
stableKeySwitchMessage(msg, partId, resetAssignments));
+
+        stopNodesInParallel(1, 2);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true,
+                -1
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        Assignments linkFirstPhaseReset = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, linkFirstPhaseReset, 60_000);
+
+        // Assignments chain consists of stable and the first phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments, linkFirstPhaseReset)));
+
+        // Unblock stable switch, wait for reset phase 2 assignments to 
replace phase 1 assignments in the chain.
+        blockedLink.set(false);
+
+        logger().info("Unblocked stable switch.");
+
+        assertRealAssignments(node0, partId, 0, 4, 5);
+
+        assertStableAssignments(node0, partId, resetAssignments, 60_000);
+
+        // Assignments chain consists of stable and the second phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments, resetAssignments)));
+    }
+
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+        Assignments allAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name()),
+                Assignment.forPeer(node(6).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, allAssignments);
+
+        // Assignments chain is equal to the stable assignments.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments));
+
+        // Write data(1) to all nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        ), timestamp);
+
+        AtomicBoolean blockedLink2 = new AtomicBoolean(true);
+
+        // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
+        blockMessage((nodeName, msg) -> blockedLink2.get() && 
stableKeySwitchMessage(msg, partId, link2Assignments));
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 
4, 5, 6}));
+
+        stopNodesInParallel(3, 4, 5, 6);
+
+        Assignments link2FirstPhaseReset = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000);
+
+        // Assignments chain consists of stable and the first phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2FirstPhaseReset)));
+
+        // Unblock stable switch, wait for reset phase 2 assignments to 
replace phase 1 assignments in the chain.
+        blockedLink2.set(false);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        // Assignments chain consists of stable and the second phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments)));
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 
2}));
+        stopNodesInParallel(1, 2);
+
+        Assignments link3Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link3Assignments, 30_000);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments, 
link3Assignments)));
+    }
+
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testSecondResetRewritesFirst() throws Exception {

Review Comment:
   Yep, good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to