rpuch commented on code in PR #5544: URL: https://github.com/apache/ignite-3/pull/5544#discussion_r2030536847
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -448,6 +452,112 @@ public CompletableFuture<Void> restartPartitions( } } + /** + * Returns states of partitions in the cluster. Result is a mapping of {@link ZonePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneNames Names specifying zones to get partition states from. Case-sensitive, empty set means "all zones". + * @param nodeNames Names specifying nodes to get partition states from. Case-sensitive, empty set means "all nodes". + * @param partitionIds IDs of partitions to get states of. Empty set means "all partitions". + * @return Future with the mapping. + */ + public CompletableFuture<Map<ZonePartitionId, LocalZonePartitionStateByNode>> localZonePartitionStates( + Set<String> zoneNames, + Set<String> nodeNames, + Set<Integer> partitionIds + ) { + try { + Catalog catalog = catalogLatestVersion(); + + return localZonePartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog) + .thenApply(res -> normalizeZoneLocal(res, catalog)); + } catch (Throwable t) { + return failedFuture(t); + } + } + + /** + * Returns states of partitions in the cluster. Result is a mapping of {@link ZonePartitionId} to the global + * partition state enum value. Review Comment: `GlobalZonePartitionState` is not an enum ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -448,6 +452,112 @@ public CompletableFuture<Void> restartPartitions( } } + /** + * Returns states of partitions in the cluster. Result is a mapping of {@link ZonePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneNames Names specifying zones to get partition states from. Case-sensitive, empty set means "all zones". + * @param nodeNames Names specifying nodes to get partition states from. Case-sensitive, empty set means "all nodes". + * @param partitionIds IDs of partitions to get states of. Empty set means "all partitions". + * @return Future with the mapping. + */ + public CompletableFuture<Map<ZonePartitionId, LocalZonePartitionStateByNode>> localZonePartitionStates( + Set<String> zoneNames, + Set<String> nodeNames, + Set<Integer> partitionIds + ) { + try { + Catalog catalog = catalogLatestVersion(); + + return localZonePartitionStatesInternal(zoneNames, nodeNames, partitionIds, catalog) + .thenApply(res -> normalizeZoneLocal(res, catalog)); + } catch (Throwable t) { + return failedFuture(t); + } + } + + /** + * Returns states of partitions in the cluster. Result is a mapping of {@link ZonePartitionId} to the global + * partition state enum value. + * + * @param zoneNames Names specifying zones to get partition states. Case-sensitive, empty set means "all zones". + * @param partitionIds IDs of partitions to get states of. Empty set means "all partitions". + * @return Future with the mapping. + */ + public CompletableFuture<Map<ZonePartitionId, GlobalZonePartitionState>> globalZonePartitionStates( + Set<String> zoneNames, + Set<Integer> partitionIds + ) { + try { + Catalog catalog = catalogLatestVersion(); + + return localZonePartitionStatesInternal(zoneNames, Set.of(), partitionIds, catalog) + .thenApply(res -> normalizeZoneLocal(res, catalog)) + .thenApply(res -> assembleZoneGlobal(res, partitionIds, catalog)); + } catch (Throwable t) { + return failedFuture(t); + } + } + + CompletableFuture<Map<ZonePartitionId, LocalPartitionStateMessageByNode>> localZonePartitionStatesInternal( Review Comment: This method duplicates `localPartitionStatesInternal()`, the only differences between them seem to be the type of the replication group ID and the way it's extracted from the response. It seems easy to generalize this code to avoid duplication. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -947,6 +1238,41 @@ private static void putUnavailableStateIfAbsent( ); } + private static GlobalZonePartitionState assembleGlobalZoneStateFromLocal( + Catalog catalog, + ZonePartitionId zonePartitionId, + LocalZonePartitionStateByNode map + ) { + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId()); + + int replicas = zoneDescriptor.replicas(); + int quorum = replicas / 2 + 1; Review Comment: Please extract this to a method ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalZonePartitionStateByNode.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; + +/** Container for LocalPartitionState to node name map. */ Review Comment: ```suggestion /** Container for LocalZonePartitionState to node name map. */ ``` ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java: ########## @@ -95,6 +127,140 @@ void testRestartPartitions() { assertThat(selectAll(), hasSize(4)); } + @Test + @ZoneParams(nodes = 2, replicas = 2, partitions = 2) + void testLocalPartitionStateTable() throws Exception { Review Comment: Please add ``` @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") ``` to all table-only tests so that, when we run these tests on a colocation-enabled branch, table tests don't suddenly fail ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -448,6 +452,112 @@ public CompletableFuture<Void> restartPartitions( } } + /** + * Returns states of partitions in the cluster. Result is a mapping of {@link ZonePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneNames Names specifying zones to get partition states from. Case-sensitive, empty set means "all zones". + * @param nodeNames Names specifying nodes to get partition states from. Case-sensitive, empty set means "all nodes". + * @param partitionIds IDs of partitions to get states of. Empty set means "all partitions". + * @return Future with the mapping. + */ + public CompletableFuture<Map<ZonePartitionId, LocalZonePartitionStateByNode>> localZonePartitionStates( Review Comment: Given that zone partitions will live and table partitions will soon be removed, would it make sense to rename the existing method (`localPartitionStates` -> `localTablePartitionStates`) and name the new method without the 'Zone' in it? This concerns all the names that have 'Zone' in them ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -848,6 +1053,31 @@ private static Map<TablePartitionId, LocalPartitionStateByNode> normalizeLocal( return map; } + private static LocalZonePartitionState toZoneLocalPartitionState( + LocalPartitionStateMessage stateMsg, + long maxLogIndex, + ZonePartitionId zonePartitionId, + Catalog catalog + ) { + LocalPartitionStateEnum stateEnum = stateMsg.state(); + + if (stateEnum == HEALTHY && maxLogIndex - stateMsg.logIndex() >= CATCH_UP_THRESHOLD) { + stateEnum = CATCHING_UP; + } Review Comment: Let's extract this replacement logic to a method. It's now used in 2 places (and we don't know for how long it will be this way), so easily-extractable non-trivial logic should be extracted ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -947,6 +1238,41 @@ private static void putUnavailableStateIfAbsent( ); } + private static GlobalZonePartitionState assembleGlobalZoneStateFromLocal( + Catalog catalog, + ZonePartitionId zonePartitionId, + LocalZonePartitionStateByNode map + ) { + CatalogZoneDescriptor zoneDescriptor = catalog.zone(zonePartitionId.zoneId()); + + int replicas = zoneDescriptor.replicas(); + int quorum = replicas / 2 + 1; + + Map<LocalPartitionStateEnum, List<LocalZonePartitionState>> groupedStates = map.values().stream() + .collect(groupingBy(localPartitionState -> localPartitionState.state)); + + GlobalPartitionStateEnum globalStateEnum; + + int healthyReplicas = groupedStates.getOrDefault(HEALTHY, emptyList()).size(); + + if (healthyReplicas == replicas) { + globalStateEnum = AVAILABLE; + } else if (healthyReplicas >= quorum) { + globalStateEnum = DEGRADED; + } else if (healthyReplicas > 0) { + globalStateEnum = READ_ONLY; + } else { + globalStateEnum = GlobalPartitionStateEnum.UNAVAILABLE; + } + Review Comment: Please extract this as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org