ibessonov commented on code in PR #6023:
URL: https://github.com/apache/ignite-3/pull/6023#discussion_r2213121155


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -3322,6 +3358,42 @@ public CompletableFuture<Void> 
restartPartition(TablePartitionId tablePartitionI
         }), ioExecutor));
     }
 
+    /**
+     * Restarts the table partition including the replica and raft node.
+     *
+     * @param tablePartitionId Table partition that needs to be restarted.
+     * @param revision Metastore revision.
+     * @return Operation future.
+     */
+    public CompletableFuture<Void> restartPartitionWithCleanUp(
+            TablePartitionId tablePartitionId,
+            long revision,
+            long assignmentsTimestamp

Review Comment:
   This parameter is undocumented



##########
modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -710,6 +710,8 @@ public static class DisasterRecovery {
 
         /** Error while returning partition states. */
         public static final int CLUSTER_NOT_IDLE_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 4);
+
+        public static final int RESTART_WITH_CLEAN_UP_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 5);

Review Comment:
   Please add a small comment. It's a public API module



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java:
##########
@@ -55,6 +59,14 @@ protected ManualGroupRestartRequest readExternalData(byte 
protoVer, IgniteDataIn
         Set<String> nodeNames = readStringSet(in);
         HybridTimestamp assignmentsTimestamp = HybridTimestamp.readFrom(in);
 
-        return new ManualGroupRestartRequest(operationId, zoneId, tableId, 
partitionIds, nodeNames, assignmentsTimestamp.longValue());
+        return new ManualGroupRestartRequest(
+                operationId,
+                zoneId,
+                tableId,
+                partitionIds,
+                nodeNames,
+                assignmentsTimestamp.longValue(),
+                protoVer >= 2 && in.readBoolean() // Read the new 'force' 
field if protocol version is 2.

Review Comment:
   Are you sure that inlining this expression is a good idea? I propose having 
all the `in.*` operations in a single place above `return`, thank you!



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+
+                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                        if (zoneDescriptor.replicas() >= 2) {
+                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                    groupId,
+                                    revision,
+                                    assignmentsTimestamp
+                            ));
+                        }
+                    } else {
+                        restartFutures.add(
+                                enoughAliveNodesToRestartWithCleanUp(
+                                    disasterRecoveryManager,
+                                    revision,
+                                    replicationGroupId,
+                                    zoneDescriptor,
+                                    catalog
+                                ).thenCompose(
+                                    enoughNodes -> {
+                                        if (enoughNodes) {
+                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                                    groupId,
+                                                    revision,
+                                                    assignmentsTimestamp
+                                            );
+                                        } else {
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                                    + "to perform reset with 
clean up.");
+                                        }
+                                    }
+                                )
+                        );
+                    }
+                }
+            } else {
+                if (replicationGroupId instanceof ZonePartitionId) {
+                    // todo support zone partitions

Review Comment:
   Please add a Jira link to your TODO



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -99,6 +124,20 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
 
         var restartFutures = new ArrayList<CompletableFuture<?>>();
 
+        if (cleanUp) {
+            restartPartitionWithCleanup(disasterRecoveryManager, revision, 
timestamp, restartFutures);
+        } else {
+            restartPartition(disasterRecoveryManager, revision, 
restartFutures);
+        }
+
+        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));

Review Comment:
   Let's also add TODO to check what happens with these futures, I don't want 
partition restarts to stall the entire node



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+
+                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                        if (zoneDescriptor.replicas() >= 2) {
+                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                    groupId,
+                                    revision,
+                                    assignmentsTimestamp
+                            ));
+                        }
+                    } else {
+                        restartFutures.add(
+                                enoughAliveNodesToRestartWithCleanUp(
+                                    disasterRecoveryManager,
+                                    revision,
+                                    replicationGroupId,
+                                    zoneDescriptor,
+                                    catalog
+                                ).thenCompose(
+                                    enoughNodes -> {
+                                        if (enoughNodes) {
+                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                                    groupId,
+                                                    revision,
+                                                    assignmentsTimestamp
+                                            );
+                                        } else {
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                                    + "to perform reset with 
clean up.");
+                                        }
+                                    }
+                                )
+                        );
+                    }
+                }
+            } else {
+                if (replicationGroupId instanceof ZonePartitionId) {
+                    // todo support zone partitions
+                }
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long msRevision,
+            ReplicationGroupId replicationGroupId,
+            CatalogZoneDescriptor zoneDescriptor,
+            Catalog catalog
+    ) {
+        if (zoneDescriptor.replicas() <= 2) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        TablePartitionId tablePartitionId = (TablePartitionId) 
replicationGroupId;

Review Comment:
   Unsafe cast, please add a TODO with the issue where you'll support colocated 
code, or change the type of the argument



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+
+                    if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                        if (zoneDescriptor.replicas() >= 2) {
+                            
restartFutures.add(disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                    groupId,
+                                    revision,
+                                    assignmentsTimestamp
+                            ));
+                        }
+                    } else {
+                        restartFutures.add(
+                                enoughAliveNodesToRestartWithCleanUp(
+                                    disasterRecoveryManager,
+                                    revision,
+                                    replicationGroupId,
+                                    zoneDescriptor,
+                                    catalog
+                                ).thenCompose(
+                                    enoughNodes -> {
+                                        if (enoughNodes) {
+                                            return 
disasterRecoveryManager.tableManager.restartPartitionWithCleanUp(
+                                                    groupId,
+                                                    revision,
+                                                    assignmentsTimestamp
+                                            );
+                                        } else {
+                                            throw new 
DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not enough alive node "
+                                                    + "to perform reset with 
clean up.");
+                                        }
+                                    }
+                                )
+                        );
+                    }
+                }
+            } else {
+                if (replicationGroupId instanceof ZonePartitionId) {
+                    // todo support zone partitions
+                }
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> enoughAliveNodesToRestartWithCleanUp(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long msRevision,
+            ReplicationGroupId replicationGroupId,
+            CatalogZoneDescriptor zoneDescriptor,
+            Catalog catalog
+    ) {
+        if (zoneDescriptor.replicas() <= 2) {
+            return CompletableFuture.completedFuture(false);

Review Comment:
   We have `CompletableFutures.falseCompletedFuture()`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java:
##########
@@ -124,8 +163,112 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
                 }
             }
         });
+    }
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+    private void restartPartitionWithCleanup(
+            DisasterRecoveryManager disasterRecoveryManager,
+            long revision,
+            HybridTimestamp timestamp,
+            ArrayList<CompletableFuture<?>> restartFutures
+    ) {
+        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+
+            CatalogManager catalogManager = 
disasterRecoveryManager.catalogManager;
+
+            Catalog catalog = 
catalogManager.activeCatalog(timestamp.longValue());
+
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            if (replicationGroupId instanceof TablePartitionId) {
+                TablePartitionId groupId = (TablePartitionId) 
replicationGroupId;
+
+                if (groupId.tableId() == tableId && 
partitionIds.contains(groupId.partitionId())) {
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to