kgusakov commented on code in PR #4847:
URL: https://github.com/apache/ignite-3/pull/4847#discussion_r1883073462


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2235,10 +2263,25 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
                         return;
                     }
 
+                    // A node might exist in stable yet we don't want to start 
the replica
+                    // The case is: nodes A, B and C hold a replication group,
+                    // nodes A and B die.
+                    // Reset adds C into force pending and user writes data 
onto C.
+                    // Then A and B go back online. In this case
+                    // stable = [A, B, C], pending = [C, force] and only C 
should be started.
+                    if (isRecovery && 
!replicaMgr.isReplicaStarted(replicaGrpId)) {

Review Comment:
   Not sure that I understand:
   - How we guarantee that this check will not break the non-HA case?
   - This piece of code is not start any nodes, but just update the 
configuration of raft client, shouldn't we rewrite comment a little bit?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1065,53 +1067,71 @@ private CompletableFuture<?> 
onTableRename(RenameTableEventParameters parameters
     /**
      * Updates or creates partition raft groups and storages.
      *
-     * @param assignmentsFuture Table assignments.
+     * @param stableAssignmentsFuture Table assignments.
      * @param table Initialized table entity.
-     * @param zoneId Zone id.
      * @param isRecovery {@code true} if the node is being started up.
      * @param assignmentsTimestamp Assignments timestamp.
      * @return future, which will be completed when the partitions creations 
done.
      */
     private CompletableFuture<Void> startLocalPartitionsAndClients(
-            CompletableFuture<List<Assignments>> assignmentsFuture,
+            CompletableFuture<List<Assignments>> stableAssignmentsFuture,
+            List<@Nullable Assignments> pendingAssignmentsForPartitions,
             TableImpl table,
-            int zoneId,
             boolean isRecovery,
             long assignmentsTimestamp
     ) {
         int tableId = table.tableId();
 
         // Create new raft nodes according to new assignments.
-        return assignmentsFuture.thenCompose(tableAssignments -> {
+        return 
stableAssignmentsFuture.thenCompose(stableAssignmentsForPartitions -> {
             // Empty assignments might be a valid case if tables are created 
from within cluster init HOCON
             // configuration, which is not supported now.
-            assert tableAssignments != null : 
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
+            assert stableAssignmentsForPartitions != null
+                    : IgniteStringFormatter.format("Table [id={}] has empty 
assignments.", tableId);
 
-            int partitions = tableAssignments.size();
+            int partitions = stableAssignmentsForPartitions.size();
 
             var futures = new CompletableFuture<?>[partitions];
 
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
-                Assignments assignments = tableAssignments.get(i);
+                Assignments stableAssignments = 
stableAssignmentsForPartitions.get(i);
 
-                CompletableFuture<?> future = startPartitionAndStartClient(
-                        table,
-                        partId,
-                        localMemberAssignment(assignments),
-                        assignments,
-                        zoneId,
-                        isRecovery,
-                        assignmentsTimestamp
-                )
-                        .whenComplete((res, ex) -> {
-                            if (ex != null) {
-                                LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
-                            }
-                        });
+                Assignments pendingAssignments = 
pendingAssignmentsForPartitions.get(i);
+
+                Assignment localMemberAssignmentInStable = 
localMemberAssignment(stableAssignments);
 
-                futures[i] = future;
+                boolean shouldStartPartition;
+
+                if (isRecovery) {
+                    // The condition to start the replica is
+                    // `pending.contains(node) || (stable.contains(node) && 
!pending.isForce())`.
+                    // However we check only the right part of this condition 
here
+                    // since after `startTables` we have a call to 
`processAssignmentsOnRecovery`,

Review Comment:
   So, there are no ways to put the {@link } in this place, because we can't 
use javadoc here, am I right?



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    @Test
+    @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
+    void testManualRebalanceRecovery() throws Exception {
+        int partId = 0;
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, 300));
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        Assignments allAssignments = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        );
+
+        // The partition is replicated to 3 nodes.
+        assertRealAssignments(node0, partId, 0, 1, 2);
+
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        // Make sure node 0 has the same data as node 1 and 2.
+        assertTrue(
+                waitForCondition(() -> getRaftLogIndex(0, 
partId).equals(getRaftLogIndex(1, partId)), SECONDS.toMillis(20)),
+                () -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " 
node 1 log index= " + getRaftLogIndex(1, partId)
+        );
+        assertTrue(
+                waitForCondition(() -> getRaftLogIndex(0, 
partId).equals(getRaftLogIndex(2, partId)), SECONDS.toMillis(20)),
+                () -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " 
node 2 log index= " + getRaftLogIndex(2, partId)
+        );
+
+        // Stop 1 and 2, only 0 survived.
+        stopNodesInParallel(1, 2);
+
+        Assignments assignment0 = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name())
+        );
+
+        // Blocking stable switch to the first phase or reset,
+        // so that we'll have force pending assignments unexecuted.
+        blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, 
assignment0));
+
+        // Init reset:
+        // pending = [0, force]
+        // planned = [0, 3, 4]
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        awaitPrimaryReplica(node0, partId);
+
+        // Since stable switch to 0 is blocked, data is stored on 0 node only.
+        assertRealAssignments(node0, partId, 0);
+
+        // And stable is in its initial state.
+        assertStableAssignments(node0, partId, allAssignments);
+
+        // Insert new data on the node 0.
+        errors = insertValues(table, partId, 10);
+        assertThat(errors, is(empty()));
+
+        // Start nodes 1 and 2 back, but they should not start their replicas.
+        startNode(1);
+        startNode(2);
+
+        // Make sure 1 and 2 did not start.
+        assertRealAssignments(node0, partId, 0);

Review Comment:
   We can have a positive check only because we are not YET start it in the 
moment of check. I suppose startNode() will end before the actual recovery is 
executed (pls check me). Looks like we need to find the mechanism to check that 
recovery is really done or something like this.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    @Test
+    @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
+    void testManualRebalanceRecovery() throws Exception {
+        int partId = 0;
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, 300));

Review Comment:
   Can we use more clear CatalogUtils.INFINITE_TIMER_VALUE constant here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1065,53 +1067,71 @@ private CompletableFuture<?> 
onTableRename(RenameTableEventParameters parameters
     /**
      * Updates or creates partition raft groups and storages.
      *
-     * @param assignmentsFuture Table assignments.
+     * @param stableAssignmentsFuture Table assignments.
      * @param table Initialized table entity.
-     * @param zoneId Zone id.
      * @param isRecovery {@code true} if the node is being started up.
      * @param assignmentsTimestamp Assignments timestamp.
      * @return future, which will be completed when the partitions creations 
done.
      */
     private CompletableFuture<Void> startLocalPartitionsAndClients(
-            CompletableFuture<List<Assignments>> assignmentsFuture,
+            CompletableFuture<List<Assignments>> stableAssignmentsFuture,
+            List<@Nullable Assignments> pendingAssignmentsForPartitions,
             TableImpl table,
-            int zoneId,
             boolean isRecovery,
             long assignmentsTimestamp
     ) {
         int tableId = table.tableId();
 
         // Create new raft nodes according to new assignments.
-        return assignmentsFuture.thenCompose(tableAssignments -> {
+        return 
stableAssignmentsFuture.thenCompose(stableAssignmentsForPartitions -> {
             // Empty assignments might be a valid case if tables are created 
from within cluster init HOCON
             // configuration, which is not supported now.
-            assert tableAssignments != null : 
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
+            assert stableAssignmentsForPartitions != null
+                    : IgniteStringFormatter.format("Table [id={}] has empty 
assignments.", tableId);
 
-            int partitions = tableAssignments.size();
+            int partitions = stableAssignmentsForPartitions.size();
 
             var futures = new CompletableFuture<?>[partitions];
 
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
-                Assignments assignments = tableAssignments.get(i);
+                Assignments stableAssignments = 
stableAssignmentsForPartitions.get(i);
 
-                CompletableFuture<?> future = startPartitionAndStartClient(
-                        table,
-                        partId,
-                        localMemberAssignment(assignments),
-                        assignments,
-                        zoneId,
-                        isRecovery,
-                        assignmentsTimestamp
-                )
-                        .whenComplete((res, ex) -> {
-                            if (ex != null) {
-                                LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
-                            }
-                        });
+                Assignments pendingAssignments = 
pendingAssignmentsForPartitions.get(i);
+
+                Assignment localMemberAssignmentInStable = 
localMemberAssignment(stableAssignments);
 
-                futures[i] = future;
+                boolean shouldStartPartition;
+
+                if (isRecovery) {
+                    // The condition to start the replica is
+                    // `pending.contains(node) || (stable.contains(node) && 
!pending.isForce())`.
+                    // However we check only the right part of this condition 
here
+                    // since after `startTables` we have a call to 
`processAssignmentsOnRecovery`,
+                    // which executes pending assignments update an will start 
required partitions there.

Review Comment:
   ```suggestion
                       // which executes pending assignments update and will 
start required partitions there.
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java:
##########
@@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws 
Exception {
         assertThat(errors, is(empty()));
     }
 
+    @Test
+    @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
+    void testManualRebalanceRecovery() throws Exception {
+        int partId = 0;
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, 300));
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));

Review Comment:
   We have the helper `igniteImpl(0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to