alievmirza commented on code in PR #4843:
URL: https://github.com/apache/ignite-3/pull/4843#discussion_r1889809546


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -907,6 +981,10 @@ private static CatalogZoneDescriptor 
zoneDescriptor(Catalog catalog, String zone
         return zoneDescriptor;
     }
 
+    private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
+        return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + "." + 
zoneId);

Review Comment:
   Why can't we add "." ti `RECOVERY_TRIGGER_REVISION_KEY_PREFIX` itself? 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -337,21 +350,44 @@ public CompletableFuture<Void> resetPartitions(String 
zoneName, String tableName
      * so that a new leader could be elected.
      *
      * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
-     * @param tableId Table id.
+     * @param tableName Fully-qualified table name. Case-sensitive, without 
quotes. Example: "PUBLIC.Foo".
      * @param partitionIds IDs of partitions to reset. If empty, reset all 
zone's partitions.
      * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
+     * @param triggerRevision Revision of the event, which produce this reset. 
-1 for manual reset.
      * @return Future that completes when partitions are reset.
      */
-    private CompletableFuture<Void> resetPartitions(String zoneName, int 
tableId, Set<Integer> partitionIds, boolean manualUpdate) {
+    private CompletableFuture<Void> resetPartitions(
+            String zoneName, String tableName, Set<Integer> partitionIds, 
boolean manualUpdate, long triggerRevision) {
+        int tableId = tableDescriptor(catalogLatestVersion(), tableName).id();
+
+        return resetPartitions(zoneName, Map.of(tableId, partitionIds), 
manualUpdate, triggerRevision);
+    }
+
+    /**
+     * Updates assignments of the table in a forced manner, allowing for the 
recovery of raft group with lost majorities. It is achieved via
+     * triggering a new rebalance with {@code force} flag enabled in {@link 
Assignments} for partitions where it's required. New pending
+     * assignments with {@code force} flag remove old stable nodes from the 
distribution, and force new Raft configuration via "resetPeers"
+     * so that a new leader could be elected.
+     *
+     * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
+     * @param partitionIds Map of per table partitions' sets to reset. If 
empty, reset all zone's partitions.
+     * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
+     * @param triggerRevision Revision of the event, which produce this reset. 
-1 for manual reset.
+     * @return Future that completes when partitions are reset.
+     */
+    private CompletableFuture<Void> resetPartitions(

Review Comment:
   ```suggestion
           private CompletableFuture<Void> resetPartitions(
               String zoneName, 
               Map<Integer, Set<Integer>> partitionIds, 
               boolean manualUpdate, 
               long triggerRevision
       ) {
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java:
##########
@@ -149,10 +146,17 @@ public CompletableFuture<Void> 
handle(DisasterRecoveryManager disasterRecoveryMa
         Catalog catalog = 
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
 
         CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
-        CatalogTableDescriptor tableDescriptor = catalog.table(tableId);
+
+        Set<Integer> allZonePartitionsToReset = new HashSet<>();
+        partitionIds.values().forEach(allZonePartitionsToReset::addAll);
 
         CompletableFuture<Map<TablePartitionId, 
LocalPartitionStateMessageByNode>> localStates
-                = 
disasterRecoveryManager.localPartitionStatesInternal(Set.of(zoneDescriptor.name()),
 emptySet(), partitionIds, catalog);
+                = disasterRecoveryManager.localPartitionStatesInternal(

Review Comment:
   That is very interesting way of formatting...
   You never cease to amaze me :) 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -337,21 +350,44 @@ public CompletableFuture<Void> resetPartitions(String 
zoneName, String tableName
      * so that a new leader could be elected.
      *
      * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
-     * @param tableId Table id.
+     * @param tableName Fully-qualified table name. Case-sensitive, without 
quotes. Example: "PUBLIC.Foo".
      * @param partitionIds IDs of partitions to reset. If empty, reset all 
zone's partitions.
      * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
+     * @param triggerRevision Revision of the event, which produce this reset. 
-1 for manual reset.
      * @return Future that completes when partitions are reset.
      */
-    private CompletableFuture<Void> resetPartitions(String zoneName, int 
tableId, Set<Integer> partitionIds, boolean manualUpdate) {
+    private CompletableFuture<Void> resetPartitions(
+            String zoneName, String tableName, Set<Integer> partitionIds, 
boolean manualUpdate, long triggerRevision) {
+        int tableId = tableDescriptor(catalogLatestVersion(), tableName).id();
+
+        return resetPartitions(zoneName, Map.of(tableId, partitionIds), 
manualUpdate, triggerRevision);
+    }
+
+    /**
+     * Updates assignments of the table in a forced manner, allowing for the 
recovery of raft group with lost majorities. It is achieved via
+     * triggering a new rebalance with {@code force} flag enabled in {@link 
Assignments} for partitions where it's required. New pending
+     * assignments with {@code force} flag remove old stable nodes from the 
distribution, and force new Raft configuration via "resetPeers"
+     * so that a new leader could be elected.
+     *
+     * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
+     * @param partitionIds Map of per table partitions' sets to reset. If 
empty, reset all zone's partitions.
+     * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
+     * @param triggerRevision Revision of the event, which produce this reset. 
-1 for manual reset.
+     * @return Future that completes when partitions are reset.
+     */
+    private CompletableFuture<Void> resetPartitions(
+            String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean 
manualUpdate, long triggerRevision) {
         try {
             Catalog catalog = catalogLatestVersion();
 
             CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
 
-            checkPartitionsRange(partitionIds, Set.of(zone));
+            partitionIds.values().forEach(ids ->
+                    checkPartitionsRange(ids, Set.of(zone)));

Review Comment:
   redundant new line 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> 
filterZones(Set<String> zoneNam
         return zoneDescriptors;
     }
 
+    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request) {
+        return processNewRequest(request, -1);
+    }
+
     /**
      * Creates new operation future, associated with the request, and writes 
it into meta-storage.
      *
      * @param request Request.
+     * @param revision Revision of event, which produce this recovery request.
      * @return Operation future.
      */
-    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request) {
+    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request, long revision) {
         UUID operationId = request.operationId();
 
         CompletableFuture<Void> operationFuture = new CompletableFuture<Void>()
                 .whenComplete((v, throwable) -> 
ongoingOperationsById.remove(operationId))
                 .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+        byte[] serializedRequest = VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE);
+
         ongoingOperationsById.put(operationId, operationFuture);
 
-        metaStorageManager.put(RECOVERY_TRIGGER_KEY, 
VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE));
+        if (revision != -1) {
+            putRecoveryTriggerIfRevisionIsNotProcessed(
+                    request.zoneId(),
+                    longToBytesKeepingOrder(revision),
+                    serializedRequest,
+                    operationId
+            );
+        } else {
+            metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
+        }
 
         return operationFuture;
     }
 
+    private void putRecoveryTriggerIfRevisionIsNotProcessed(

Review Comment:
   Javadoc is missing 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> 
filterZones(Set<String> zoneNam
         return zoneDescriptors;
     }
 
+    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request) {

Review Comment:
   please add javadoc with a reference to 
`processNewRequest(DisasterRecoveryRequest request, long revision)`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> 
filterZones(Set<String> zoneNam
         return zoneDescriptors;
     }
 
+    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request) {
+        return processNewRequest(request, -1);
+    }
+
     /**
      * Creates new operation future, associated with the request, and writes 
it into meta-storage.
      *
      * @param request Request.
+     * @param revision Revision of event, which produce this recovery request.
      * @return Operation future.
      */
-    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request) {
+    private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request, long revision) {
         UUID operationId = request.operationId();
 
         CompletableFuture<Void> operationFuture = new CompletableFuture<Void>()
                 .whenComplete((v, throwable) -> 
ongoingOperationsById.remove(operationId))
                 .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+        byte[] serializedRequest = VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE);
+
         ongoingOperationsById.put(operationId, operationFuture);
 
-        metaStorageManager.put(RECOVERY_TRIGGER_KEY, 
VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE));
+        if (revision != -1) {
+            putRecoveryTriggerIfRevisionIsNotProcessed(
+                    request.zoneId(),
+                    longToBytesKeepingOrder(revision),
+                    serializedRequest,
+                    operationId
+            );
+        } else {

Review Comment:
   please add a comment describing that why it is ok to put value to 
`RECOVERY_TRIGGER_KEY` in case of manual request bypassing 
`zoneRecoveryTriggerRevisionKey`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to