ibessonov commented on code in PR #6023: URL: https://github.com/apache/ignite-3/pull/6023#discussion_r2213121155
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -3322,6 +3358,42 @@ public CompletableFuture<Void> restartPartition(TablePartitionId tablePartitionI }), ioExecutor)); } + /** + * Restarts the table partition including the replica and raft node. + * + * @param tablePartitionId Table partition that needs to be restarted. + * @param revision Metastore revision. + * @return Operation future. + */ + public CompletableFuture<Void> restartPartitionWithCleanUp( + TablePartitionId tablePartitionId, + long revision, + long assignmentsTimestamp Review Comment: This parameter is undocumented ########## modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java: ########## @@ -710,6 +710,8 @@ public static class DisasterRecovery { /** Error while returning partition states. */ public static final int CLUSTER_NOT_IDLE_ERR = RECOVERY_ERR_GROUP.registerErrorCode((short) 4); + + public static final int RESTART_WITH_CLEAN_UP_ERR = RECOVERY_ERR_GROUP.registerErrorCode((short) 5); Review Comment: Please add a small comment. It's a public API module ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java: ########## @@ -55,6 +59,14 @@ protected ManualGroupRestartRequest readExternalData(byte protoVer, IgniteDataIn Set<String> nodeNames = readStringSet(in); HybridTimestamp assignmentsTimestamp = HybridTimestamp.readFrom(in); - return new ManualGroupRestartRequest(operationId, zoneId, tableId, partitionIds, nodeNames, assignmentsTimestamp.longValue()); + return new ManualGroupRestartRequest( + operationId, + zoneId, + tableId, + partitionIds, + nodeNames, + assignmentsTimestamp.longValue(), + protoVer >= 2 && in.readBoolean() // Read the new 'force' field if protocol version is 2. Review Comment: Are you sure that inlining this expression is a good idea? I propose having all the `in.*` operations in a single place above `return`, thank you! ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions Review Comment: Please add a Jira link to your TODO ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -99,6 +124,20 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa var restartFutures = new ArrayList<CompletableFuture<?>>(); + if (cleanUp) { + restartPartitionWithCleanup(disasterRecoveryManager, revision, timestamp, restartFutures); + } else { + restartPartition(disasterRecoveryManager, revision, restartFutures); + } + + return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); Review Comment: Let's also add TODO to check what happens with these futures, I don't want partition restarts to stall the entire node ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions + } + } + }); + } + + private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp( + DisasterRecoveryManager disasterRecoveryManager, + long msRevision, + ReplicationGroupId replicationGroupId, + CatalogZoneDescriptor zoneDescriptor, + Catalog catalog + ) { + if (zoneDescriptor.replicas() <= 2) { + return CompletableFuture.completedFuture(false); + } + + TablePartitionId tablePartitionId = (TablePartitionId) replicationGroupId; Review Comment: Unsafe cast, please add a TODO with the issue where you'll support colocated code, or change the type of the argument ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions + } + } + }); + } + + private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp( + DisasterRecoveryManager disasterRecoveryManager, + long msRevision, + ReplicationGroupId replicationGroupId, + CatalogZoneDescriptor zoneDescriptor, + Catalog catalog + ) { + if (zoneDescriptor.replicas() <= 2) { + return CompletableFuture.completedFuture(false); Review Comment: We have `CompletableFutures.falseCompletedFuture()` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org