alievmirza commented on code in PR #4843: URL: https://github.com/apache/ignite-3/pull/4843#discussion_r1889809546
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -907,6 +981,10 @@ private static CatalogZoneDescriptor zoneDescriptor(Catalog catalog, String zone return zoneDescriptor; } + private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) { + return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + "." + zoneId); Review Comment: Why can't we add "." ti `RECOVERY_TRIGGER_REVISION_KEY_PREFIX` itself? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -337,21 +350,44 @@ public CompletableFuture<Void> resetPartitions(String zoneName, String tableName * so that a new leader could be elected. * * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableId Table id. + * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions. * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. + * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. * @return Future that completes when partitions are reset. */ - private CompletableFuture<Void> resetPartitions(String zoneName, int tableId, Set<Integer> partitionIds, boolean manualUpdate) { + private CompletableFuture<Void> resetPartitions( + String zoneName, String tableName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) { + int tableId = tableDescriptor(catalogLatestVersion(), tableName).id(); + + return resetPartitions(zoneName, Map.of(tableId, partitionIds), manualUpdate, triggerRevision); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. + * @param partitionIds Map of per table partitions' sets to reset. If empty, reset all zone's partitions. + * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. + * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. + * @return Future that completes when partitions are reset. + */ + private CompletableFuture<Void> resetPartitions( Review Comment: ```suggestion private CompletableFuture<Void> resetPartitions( String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean manualUpdate, long triggerRevision ) { ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java: ########## @@ -149,10 +146,17 @@ public CompletableFuture<Void> handle(DisasterRecoveryManager disasterRecoveryMa Catalog catalog = disasterRecoveryManager.catalogManager.catalog(catalogVersion); CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId); - CatalogTableDescriptor tableDescriptor = catalog.table(tableId); + + Set<Integer> allZonePartitionsToReset = new HashSet<>(); + partitionIds.values().forEach(allZonePartitionsToReset::addAll); CompletableFuture<Map<TablePartitionId, LocalPartitionStateMessageByNode>> localStates - = disasterRecoveryManager.localPartitionStatesInternal(Set.of(zoneDescriptor.name()), emptySet(), partitionIds, catalog); + = disasterRecoveryManager.localPartitionStatesInternal( Review Comment: That is very interesting way of formatting... You never cease to amaze me :) ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -337,21 +350,44 @@ public CompletableFuture<Void> resetPartitions(String zoneName, String tableName * so that a new leader could be elected. * * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. - * @param tableId Table id. + * @param tableName Fully-qualified table name. Case-sensitive, without quotes. Example: "PUBLIC.Foo". * @param partitionIds IDs of partitions to reset. If empty, reset all zone's partitions. * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. + * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. * @return Future that completes when partitions are reset. */ - private CompletableFuture<Void> resetPartitions(String zoneName, int tableId, Set<Integer> partitionIds, boolean manualUpdate) { + private CompletableFuture<Void> resetPartitions( + String zoneName, String tableName, Set<Integer> partitionIds, boolean manualUpdate, long triggerRevision) { + int tableId = tableDescriptor(catalogLatestVersion(), tableName).id(); + + return resetPartitions(zoneName, Map.of(tableId, partitionIds), manualUpdate, triggerRevision); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneName Name of the distribution zone. Case-sensitive, without quotes. + * @param partitionIds Map of per table partitions' sets to reset. If empty, reset all zone's partitions. + * @param manualUpdate Whether the update is triggered manually by user or automatically by core logic. + * @param triggerRevision Revision of the event, which produce this reset. -1 for manual reset. + * @return Future that completes when partitions are reset. + */ + private CompletableFuture<Void> resetPartitions( + String zoneName, Map<Integer, Set<Integer>> partitionIds, boolean manualUpdate, long triggerRevision) { try { Catalog catalog = catalogLatestVersion(); CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName); - checkPartitionsRange(partitionIds, Set.of(zone)); + partitionIds.values().forEach(ids -> + checkPartitionsRange(ids, Set.of(zone))); Review Comment: redundant new line ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> filterZones(Set<String> zoneNam return zoneDescriptors; } + private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) { + return processNewRequest(request, -1); + } + /** * Creates new operation future, associated with the request, and writes it into meta-storage. * * @param request Request. + * @param revision Revision of event, which produce this recovery request. * @return Operation future. */ - private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) { + private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request, long revision) { UUID operationId = request.operationId(); CompletableFuture<Void> operationFuture = new CompletableFuture<Void>() .whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId)) .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS); + byte[] serializedRequest = VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE); + ongoingOperationsById.put(operationId, operationFuture); - metaStorageManager.put(RECOVERY_TRIGGER_KEY, VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE)); + if (revision != -1) { + putRecoveryTriggerIfRevisionIsNotProcessed( + request.zoneId(), + longToBytesKeepingOrder(revision), + serializedRequest, + operationId + ); + } else { + metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest); + } return operationFuture; } + private void putRecoveryTriggerIfRevisionIsNotProcessed( Review Comment: Javadoc is missing ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> filterZones(Set<String> zoneNam return zoneDescriptors; } + private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) { Review Comment: please add javadoc with a reference to `processNewRequest(DisasterRecoveryRequest request, long revision)` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -569,26 +605,64 @@ private static Collection<CatalogZoneDescriptor> filterZones(Set<String> zoneNam return zoneDescriptors; } + private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) { + return processNewRequest(request, -1); + } + /** * Creates new operation future, associated with the request, and writes it into meta-storage. * * @param request Request. + * @param revision Revision of event, which produce this recovery request. * @return Operation future. */ - private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request) { + private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest request, long revision) { UUID operationId = request.operationId(); CompletableFuture<Void> operationFuture = new CompletableFuture<Void>() .whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId)) .orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS); + byte[] serializedRequest = VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE); + ongoingOperationsById.put(operationId, operationFuture); - metaStorageManager.put(RECOVERY_TRIGGER_KEY, VersionedSerialization.toBytes(request, DisasterRecoveryRequestSerializer.INSTANCE)); + if (revision != -1) { + putRecoveryTriggerIfRevisionIsNotProcessed( + request.zoneId(), + longToBytesKeepingOrder(revision), + serializedRequest, + operationId + ); + } else { Review Comment: please add a comment describing that why it is ok to put value to `RECOVERY_TRIGGER_KEY` in case of manual request bypassing `zoneRecoveryTriggerRevisionKey` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org