alievmirza commented on code in PR #6023:
URL: https://github.com/apache/ignite-3/pull/6023#discussion_r2219171586


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -3322,6 +3358,42 @@ public CompletableFuture<Void> 
restartPartition(TablePartitionId tablePartitionI
         }), ioExecutor));
     }
 
+    /**
+     * Restarts the table partition including the replica and raft node.
+     *
+     * @param tablePartitionId Table partition that needs to be restarted.
+     * @param revision Metastore revision.
+     * @return Operation future.
+     */
+    public CompletableFuture<Void> restartPartitionWithCleanUp(
+            TablePartitionId tablePartitionId,
+            long revision,
+            long assignmentsTimestamp

Review Comment:
   documented 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+
+                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                        if (zoneDescriptor.replicas() >= 2) {
+                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                    groupId,
+                                    revision,
+                                    assignmentsTimestamp
+                            ));
+                        }
+                    } else {
+                        restartFutures.add(
+                                enoughAliveNodesToRestartWithCleanUp(
+                                    disasterRecoveryManager,
+                                    revision,
+                                    replicationGroupId,
+                                    zoneDescriptor,
+                                    catalog
+                                ).thenCompose(
+                                    enoughNodes -> {
+                                        if (enoughNodes) {
+                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                                    groupId,
+                                                    revision,
+                                                    assignmentsTimestamp
+                                            );
+                                        } else {
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                                    + "to perform reset with 
clean up.");
+                                        }
+                                    }
+                                )
+                        );
+                    }
+                }
+            } else {
+                if (replicationGroupId instanceof ZonePartitionId) {
+                    // todo support zone partitions

Review Comment:
   added



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+

Review Comment:
   done



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+
+                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                        if (zoneDescriptor.replicas() >= 2) {
+                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                    groupId,
+                                    revision,
+                                    assignmentsTimestamp
+                            ));
+                        }
+                    } else {
+                        restartFutures.add(
+                                enoughAliveNodesToRestartWithCleanUp(
+                                    disasterRecoveryManager,
+                                    revision,
+                                    replicationGroupId,
+                                    zoneDescriptor,
+                                    catalog
+                                ).thenCompose(
+                                    enoughNodes -> {
+                                        if (enoughNodes) {
+                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                                    groupId,
+                                                    revision,
+                                                    assignmentsTimestamp
+                                            );
+                                        } else {
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                                    + "to perform reset with 
clean up.");
+                                        }
+                                    }
+                                )
+                        );
+                    }
+                }
+            } else {
+                if (replicationGroupId instanceof ZonePartitionId) {
+                    // todo support zone partitions
+                }
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long msRevision,
+            ReplicationGroupId replicationGroupId,
+            CatalogZoneDescriptor zoneDescriptor,
+            Catalog catalog
+    ) {
+        if (zoneDescriptor.replicas() <= 2) {
+            return CompletableFuture.completedFuture(false);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to