alievmirza commented on code in PR #6023: URL: https://github.com/apache/ignite-3/pull/6023#discussion_r2219172255
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions + } + } + }); + } + + private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp( + DisasterRecoveryManager disasterRecoveryManager, + long msRevision, + ReplicationGroupId replicationGroupId, + CatalogZoneDescriptor zoneDescriptor, + Catalog catalog + ) { + if (zoneDescriptor.replicas() <= 2) { + return CompletableFuture.completedFuture(false); + } + + TablePartitionId tablePartitionId = (TablePartitionId) replicationGroupId; Review Comment: added ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java: ########## @@ -55,6 +59,14 @@ protected ManualGroupRestartRequest readExternalData(byte protoVer, IgniteDataIn Set<String> nodeNames = readStringSet(in); HybridTimestamp assignmentsTimestamp = HybridTimestamp.readFrom(in); - return new ManualGroupRestartRequest(operationId, zoneId, tableId, partitionIds, nodeNames, assignmentsTimestamp.longValue()); + return new ManualGroupRestartRequest( + operationId, + zoneId, + tableId, + partitionIds, + nodeNames, + assignmentsTimestamp.longValue(), + protoVer >= 2 && in.readBoolean() // Read the new 'force' field if protocol version is 2. Review Comment: refactored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org