alievmirza commented on code in PR #6023: URL: https://github.com/apache/ignite-3/pull/6023#discussion_r2219171586
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -3322,6 +3358,42 @@ public CompletableFuture<Void> restartPartition(TablePartitionId tablePartitionI }), ioExecutor)); } + /** + * Restarts the table partition including the replica and raft node. + * + * @param tablePartitionId Table partition that needs to be restarted. + * @param revision Metastore revision. + * @return Operation future. + */ + public CompletableFuture<Void> restartPartitionWithCleanUp( + TablePartitionId tablePartitionId, + long revision, + long assignmentsTimestamp Review Comment: documented ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions Review Comment: added ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + Review Comment: done ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java: ########## @@ -124,8 +163,112 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa } } }); + } - return restartFutures.isEmpty() ? nullCompletedFuture() : allOf(restartFutures.toArray(CompletableFuture[]::new)); + private void restartPartitionWithCleanup( + DisasterRecoveryManager disasterRecoveryManager, + long revision, + HybridTimestamp timestamp, + ArrayList<CompletableFuture<?>> restartFutures + ) { + disasterRecoveryManager.raftManager.forEach((raftNodeId, raftGroupService) -> { + ReplicationGroupId replicationGroupId = raftNodeId.groupId(); + + CatalogManager catalogManager = disasterRecoveryManager.catalogManager; + + Catalog catalog = catalogManager.activeCatalog(timestamp.longValue()); + + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); + + if (replicationGroupId instanceof TablePartitionId) { + TablePartitionId groupId = (TablePartitionId) replicationGroupId; + + if (groupId.tableId() == tableId && partitionIds.contains(groupId.partitionId())) { + + if (zoneDescriptor.consistencyMode() == ConsistencyMode.HIGH_AVAILABILITY) { + if (zoneDescriptor.replicas() >= 2) { + restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + )); + } + } else { + restartFutures.add( + enoughAliveNodesToRestartWithCleanUp( + disasterRecoveryManager, + revision, + replicationGroupId, + zoneDescriptor, + catalog + ).thenCompose( + enoughNodes -> { + if (enoughNodes) { + return disasterRecoveryManager.tableManager.restartPartitionWithCleanUp( + groupId, + revision, + assignmentsTimestamp + ); + } else { + throw new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node " + + "to perform reset with clean up."); + } + } + ) + ); + } + } + } else { + if (replicationGroupId instanceof ZonePartitionId) { + // todo support zone partitions + } + } + }); + } + + private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp( + DisasterRecoveryManager disasterRecoveryManager, + long msRevision, + ReplicationGroupId replicationGroupId, + CatalogZoneDescriptor zoneDescriptor, + Catalog catalog + ) { + if (zoneDescriptor.replicas() <= 2) { + return CompletableFuture.completedFuture(false); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org