denis-chudov commented on code in PR #5092:
URL: https://github.com/apache/ignite-3/pull/5092#discussion_r1961863802


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -0,0 +1,1492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.lang.Math.max;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DEFAULT_TIMER;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodeHistoryContextFromValues;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.nodeNames;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
+import static org.apache.ignite.internal.lang.Pair.pair;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.notTombstone;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static org.apache.ignite.internal.util.CollectionUtils.union;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import 
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DistributionZoneTimerSerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DataNodeHistoryContext;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.Pair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Manager for data nodes of distribution zones.
+ * <br>
+ * It is used by {@link DistributionZoneManager} to calculate data nodes, see 
{@link #dataNodes(int, HybridTimestamp)}.
+ * Data nodes are stored in meta storage as {@link DataNodesHistory} for each 
zone, also for each zone there are scale up and
+ * scale down timer, stored as {@link DistributionZoneTimer}. Partition reset 
timer is calculated on the node recovery according
+ * to the scale down timer state, see {@link #restorePartitionResetTimer(int, 
DistributionZoneTimer, long)}.
+ * <br>
+ * Data nodes history is appended on topology changes, on zone filter changes 
and on zone auto adjust alterations (i.e. alterations of
+ * scale up and scale down timer configuration), see {@link 
#onTopologyChange}, {@link #onZoneFilterChange} and
+ * {@link #onAutoAdjustAlteration} methods respectively.
+ */
+public class DataNodesManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(DataNodesManager.class);
+
+    private static final int MAX_ATTEMPTS_ON_RETRY = 100;
+
+    private final MetaStorageManager metaStorageManager;
+
+    private final CatalogManager catalogManager;
+
+    private final ClockService clockService;
+
+    /** External busy lock. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /**
+     * Map with zone id as a key and set of zone timers as a value.
+     */
+    private final Map<Integer, ZoneTimers> zoneTimers  = new 
ConcurrentHashMap<>();
+
+    /** Executor for scheduling tasks for scale up and scale down processes. */
+    private final StripedScheduledThreadPoolExecutor executor;
+
+    private final String localNodeName;
+
+    private final WatchListener scaleUpTimerPrefixListener;
+
+    private final WatchListener scaleDownTimerPrefixListener;
+
+    private final WatchListener dataNodesListener;
+
+    private final Map<Integer, DataNodesHistory> dataNodesHistoryVolatile = 
new ConcurrentHashMap<>();
+
+    private final BiConsumer<Long, Integer> partitionResetClosure;
+
+    private final IntSupplier partitionDistributionResetTimeoutSupplier;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Local node name.
+     * @param busyLock External busy lock.
+     * @param metaStorageManager Meta storage manager.
+     * @param catalogManager Catalog manager.
+     * @param clockService Clock service.
+     * @param partitionResetClosure Closure to reset partitions.
+     * @param partitionDistributionResetTimeoutSupplier Supplier for partition 
distribution reset timeout.
+     */
+    public DataNodesManager(
+            String nodeName,
+            IgniteSpinBusyLock busyLock,
+            MetaStorageManager metaStorageManager,
+            CatalogManager catalogManager,
+            ClockService clockService,
+            BiConsumer<Long, Integer> partitionResetClosure,
+            IntSupplier partitionDistributionResetTimeoutSupplier
+    ) {
+        this.metaStorageManager = metaStorageManager;
+        this.catalogManager = catalogManager;
+        this.clockService = clockService;
+        this.localNodeName = nodeName;
+        this.partitionResetClosure = partitionResetClosure;
+        this.partitionDistributionResetTimeoutSupplier = 
partitionDistributionResetTimeoutSupplier;
+        this.busyLock = busyLock;
+
+        executor = createZoneManagerExecutor(
+                Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+                NamedThreadFactory.create(nodeName, "dst-zones-scheduler", LOG)
+        );
+
+        scaleUpTimerPrefixListener = createScaleUpTimerPrefixListener();
+        scaleDownTimerPrefixListener = createScaleDownTimerPrefixListener();
+        dataNodesListener = createDataNodesListener();
+    }
+
+    CompletableFuture<Void> startAsync(
+            Collection<CatalogZoneDescriptor> knownZones,
+            long recoveryRevision
+    ) {
+        metaStorageManager.registerPrefixWatch(zoneScaleUpTimerPrefix(), 
scaleUpTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneScaleDownTimerPrefix(), 
scaleDownTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneDataNodesHistoryPrefix(), 
dataNodesListener);
+
+        if (knownZones.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        Set<ByteArray> allKeys = new HashSet<>();
+        Map<Integer, CatalogZoneDescriptor> descriptors = new HashMap<>();
+
+        for (CatalogZoneDescriptor zone : knownZones) {
+            List<ByteArray> zoneKeys = List.of(
+                    zoneDataNodesHistoryKey(zone.id()),
+                    zoneScaleUpTimerKey(zone.id()),
+                    zoneScaleDownTimerKey(zone.id()),
+                    zonePartitionResetTimerKey(zone.id())
+            );
+
+            allKeys.addAll(zoneKeys);
+            descriptors.put(zone.id(), zone);
+        }
+
+        return metaStorageManager.getAll(allKeys)
+                .thenApply(entriesMap -> {
+                    for (CatalogZoneDescriptor zone : descriptors.values()) {
+                        Entry historyEntry = 
entriesMap.get(zoneDataNodesHistoryKey(zone.id()));
+                        Entry scaleUpEntry = 
entriesMap.get(zoneScaleUpTimerKey(zone.id()));
+                        Entry scaleDownEntry = 
entriesMap.get(zoneScaleDownTimerKey(zone.id()));
+                        Entry partitionResetEntry = 
entriesMap.get(zonePartitionResetTimerKey(zone.id()));
+
+                        if (missingEntry(historyEntry)) {
+                            // Not critical because if we have no history in 
this map, we look into meta storage.
+                            LOG.warn("Couldn't recover data nodes history for 
zone [id={}, historyEntry={}].", zone.id(), historyEntry);
+
+                            continue;
+                        }
+
+                        if (missingEntry(scaleUpEntry) || 
missingEntry(scaleDownEntry) || missingEntry(partitionResetEntry)) {
+                            LOG.error("Couldn't recover timers for zone 
[id={}, name={}, scaleUpEntry={}, scaleDownEntry={}, "

Review Comment:
   replaced it with throwing the assertion. This is the start of the component, 
so in case of assertion error the node will not start



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to