kgusakov commented on code in PR #4847: URL: https://github.com/apache/ignite-3/pull/4847#discussion_r1883073462
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -2235,10 +2263,25 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent( return; } + // A node might exist in stable yet we don't want to start the replica + // The case is: nodes A, B and C hold a replication group, + // nodes A and B die. + // Reset adds C into force pending and user writes data onto C. + // Then A and B go back online. In this case + // stable = [A, B, C], pending = [C, force] and only C should be started. + if (isRecovery && !replicaMgr.isReplicaStarted(replicaGrpId)) { Review Comment: Not sure that I understand: - How we guarantee that this check will not break the non-HA case? - This piece of code is not start any nodes, but just update the configuration of raft client, shouldn't we rewrite comment a little bit? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1065,53 +1067,71 @@ private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters /** * Updates or creates partition raft groups and storages. * - * @param assignmentsFuture Table assignments. + * @param stableAssignmentsFuture Table assignments. * @param table Initialized table entity. - * @param zoneId Zone id. * @param isRecovery {@code true} if the node is being started up. * @param assignmentsTimestamp Assignments timestamp. * @return future, which will be completed when the partitions creations done. */ private CompletableFuture<Void> startLocalPartitionsAndClients( - CompletableFuture<List<Assignments>> assignmentsFuture, + CompletableFuture<List<Assignments>> stableAssignmentsFuture, + List<@Nullable Assignments> pendingAssignmentsForPartitions, TableImpl table, - int zoneId, boolean isRecovery, long assignmentsTimestamp ) { int tableId = table.tableId(); // Create new raft nodes according to new assignments. - return assignmentsFuture.thenCompose(tableAssignments -> { + return stableAssignmentsFuture.thenCompose(stableAssignmentsForPartitions -> { // Empty assignments might be a valid case if tables are created from within cluster init HOCON // configuration, which is not supported now. - assert tableAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); + assert stableAssignmentsForPartitions != null + : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); - int partitions = tableAssignments.size(); + int partitions = stableAssignmentsForPartitions.size(); var futures = new CompletableFuture<?>[partitions]; for (int i = 0; i < partitions; i++) { int partId = i; - Assignments assignments = tableAssignments.get(i); + Assignments stableAssignments = stableAssignmentsForPartitions.get(i); - CompletableFuture<?> future = startPartitionAndStartClient( - table, - partId, - localMemberAssignment(assignments), - assignments, - zoneId, - isRecovery, - assignmentsTimestamp - ) - .whenComplete((res, ex) -> { - if (ex != null) { - LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); - } - }); + Assignments pendingAssignments = pendingAssignmentsForPartitions.get(i); + + Assignment localMemberAssignmentInStable = localMemberAssignment(stableAssignments); - futures[i] = future; + boolean shouldStartPartition; + + if (isRecovery) { + // The condition to start the replica is + // `pending.contains(node) || (stable.contains(node) && !pending.isForce())`. + // However we check only the right part of this condition here + // since after `startTables` we have a call to `processAssignmentsOnRecovery`, Review Comment: So, there are no ways to put the {@link } in this place, because we can't use javadoc here, am I right? ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws Exception { assertThat(errors, is(empty())); } + @Test + @ZoneParams(nodes = 5, replicas = 3, partitions = 1) + void testManualRebalanceRecovery() throws Exception { + int partId = 0; + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 300)); + + IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0)); + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); + Table table = node0.tables().table(TABLE_NAME); + + awaitPrimaryReplica(node0, partId); + + Assignments allAssignments = Assignments.of(timestamp, + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(2).name()) + ); + + // The partition is replicated to 3 nodes. + assertRealAssignments(node0, partId, 0, 1, 2); + + List<Throwable> errors = insertValues(table, partId, 0); + assertThat(errors, is(empty())); + + // Make sure node 0 has the same data as node 1 and 2. + assertTrue( + waitForCondition(() -> getRaftLogIndex(0, partId).equals(getRaftLogIndex(1, partId)), SECONDS.toMillis(20)), + () -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " node 1 log index= " + getRaftLogIndex(1, partId) + ); + assertTrue( + waitForCondition(() -> getRaftLogIndex(0, partId).equals(getRaftLogIndex(2, partId)), SECONDS.toMillis(20)), + () -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " node 2 log index= " + getRaftLogIndex(2, partId) + ); + + // Stop 1 and 2, only 0 survived. + stopNodesInParallel(1, 2); + + Assignments assignment0 = Assignments.of(timestamp, + Assignment.forPeer(node(0).name()) + ); + + // Blocking stable switch to the first phase or reset, + // so that we'll have force pending assignments unexecuted. + blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, assignment0)); + + // Init reset: + // pending = [0, force] + // planned = [0, 3, 4] + CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( + zoneName, + QUALIFIED_TABLE_NAME, + true + ); + + assertThat(updateFuture, willCompleteSuccessfully()); + + awaitPrimaryReplica(node0, partId); + + // Since stable switch to 0 is blocked, data is stored on 0 node only. + assertRealAssignments(node0, partId, 0); + + // And stable is in its initial state. + assertStableAssignments(node0, partId, allAssignments); + + // Insert new data on the node 0. + errors = insertValues(table, partId, 10); + assertThat(errors, is(empty())); + + // Start nodes 1 and 2 back, but they should not start their replicas. + startNode(1); + startNode(2); + + // Make sure 1 and 2 did not start. + assertRealAssignments(node0, partId, 0); Review Comment: We can have a positive check only because we are not YET start it in the moment of check. I suppose startNode() will end before the actual recovery is executed (pls check me). Looks like we need to find the mechanism to check that recovery is really done or something like this. ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws Exception { assertThat(errors, is(empty())); } + @Test + @ZoneParams(nodes = 5, replicas = 3, partitions = 1) + void testManualRebalanceRecovery() throws Exception { + int partId = 0; + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 300)); Review Comment: Can we use more clear CatalogUtils.INFINITE_TIMER_VALUE constant here? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1065,53 +1067,71 @@ private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters /** * Updates or creates partition raft groups and storages. * - * @param assignmentsFuture Table assignments. + * @param stableAssignmentsFuture Table assignments. * @param table Initialized table entity. - * @param zoneId Zone id. * @param isRecovery {@code true} if the node is being started up. * @param assignmentsTimestamp Assignments timestamp. * @return future, which will be completed when the partitions creations done. */ private CompletableFuture<Void> startLocalPartitionsAndClients( - CompletableFuture<List<Assignments>> assignmentsFuture, + CompletableFuture<List<Assignments>> stableAssignmentsFuture, + List<@Nullable Assignments> pendingAssignmentsForPartitions, TableImpl table, - int zoneId, boolean isRecovery, long assignmentsTimestamp ) { int tableId = table.tableId(); // Create new raft nodes according to new assignments. - return assignmentsFuture.thenCompose(tableAssignments -> { + return stableAssignmentsFuture.thenCompose(stableAssignmentsForPartitions -> { // Empty assignments might be a valid case if tables are created from within cluster init HOCON // configuration, which is not supported now. - assert tableAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); + assert stableAssignmentsForPartitions != null + : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); - int partitions = tableAssignments.size(); + int partitions = stableAssignmentsForPartitions.size(); var futures = new CompletableFuture<?>[partitions]; for (int i = 0; i < partitions; i++) { int partId = i; - Assignments assignments = tableAssignments.get(i); + Assignments stableAssignments = stableAssignmentsForPartitions.get(i); - CompletableFuture<?> future = startPartitionAndStartClient( - table, - partId, - localMemberAssignment(assignments), - assignments, - zoneId, - isRecovery, - assignmentsTimestamp - ) - .whenComplete((res, ex) -> { - if (ex != null) { - LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); - } - }); + Assignments pendingAssignments = pendingAssignmentsForPartitions.get(i); + + Assignment localMemberAssignmentInStable = localMemberAssignment(stableAssignments); - futures[i] = future; + boolean shouldStartPartition; + + if (isRecovery) { + // The condition to start the replica is + // `pending.contains(node) || (stable.contains(node) && !pending.isForce())`. + // However we check only the right part of this condition here + // since after `startTables` we have a call to `processAssignmentsOnRecovery`, + // which executes pending assignments update an will start required partitions there. Review Comment: ```suggestion // which executes pending assignments update and will start required partitions there. ``` ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -345,6 +348,145 @@ void testManualRebalanceIfPartitionIsLost() throws Exception { assertThat(errors, is(empty())); } + @Test + @ZoneParams(nodes = 5, replicas = 3, partitions = 1) + void testManualRebalanceRecovery() throws Exception { + int partId = 0; + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 300)); + + IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0)); Review Comment: We have the helper `igniteImpl(0)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org