denis-chudov commented on code in PR #5092:
URL: https://github.com/apache/ignite-3/pull/5092#discussion_r1967509143


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -0,0 +1,1562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.lang.Math.max;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DEFAULT_TIMER;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodeHistoryContextFromValues;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.nodeNames;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.notTombstone;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static org.apache.ignite.internal.util.CollectionUtils.union;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import 
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DistributionZoneTimerSerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DataNodesHistoryContext;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Manager for data nodes of distribution zones.
+ * <br>
+ * It is used by {@link DistributionZoneManager} to calculate data nodes, see 
{@link #dataNodes(int, HybridTimestamp)}.
+ * Data nodes are stored in meta storage as {@link DataNodesHistory} for each 
zone, also for each zone there are scale up and
+ * scale down timers, stored as {@link DistributionZoneTimer}. Partition reset 
timer is calculated on the node recovery according
+ * to the scale down timer state, see {@link #restorePartitionResetTimer(int, 
DistributionZoneTimer, long)}.
+ * <br>
+ * Data nodes history is appended on topology changes, on zone filter changes 
and on zone auto adjust alterations (i.e. alterations of
+ * scale up and scale down timer configuration), see {@link 
#onTopologyChange}, {@link #onZoneFilterChange} and
+ * {@link #onAutoAdjustAlteration} methods respectively.
+ */
+public class DataNodesManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(DataNodesManager.class);
+
+    private static final int MAX_ATTEMPTS_ON_RETRY = 100;
+
+    private final MetaStorageManager metaStorageManager;
+
+    private final CatalogManager catalogManager;
+
+    private final ClockService clockService;
+
+    /** External busy lock. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /**
+     * Map with zone id as a key and set of zone timers as a value.
+     */
+    private final Map<Integer, ZoneTimers> zoneTimers  = new 
ConcurrentHashMap<>();
+
+    /** Executor for scheduling tasks for scale up and scale down processes. */
+    private final StripedScheduledThreadPoolExecutor executor;
+
+    private final String localNodeName;
+
+    private final WatchListener scaleUpTimerPrefixListener;
+
+    private final WatchListener scaleDownTimerPrefixListener;
+
+    private final WatchListener dataNodesListener;
+
+    private final Map<Integer, DataNodesHistory> dataNodesHistoryVolatile = 
new ConcurrentHashMap<>();
+
+    private final BiConsumer<Long, Integer> partitionResetClosure;
+
+    private final IntSupplier partitionDistributionResetTimeoutSupplier;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Local node name.
+     * @param busyLock External busy lock.
+     * @param metaStorageManager Meta storage manager.
+     * @param catalogManager Catalog manager.
+     * @param clockService Clock service.
+     * @param partitionResetClosure Closure to reset partitions.
+     * @param partitionDistributionResetTimeoutSupplier Supplier for partition 
distribution reset timeout.
+     */
+    public DataNodesManager(
+            String nodeName,
+            IgniteSpinBusyLock busyLock,
+            MetaStorageManager metaStorageManager,
+            CatalogManager catalogManager,
+            ClockService clockService,
+            BiConsumer<Long, Integer> partitionResetClosure,
+            IntSupplier partitionDistributionResetTimeoutSupplier
+    ) {
+        this.metaStorageManager = metaStorageManager;
+        this.catalogManager = catalogManager;
+        this.clockService = clockService;
+        this.localNodeName = nodeName;
+        this.partitionResetClosure = partitionResetClosure;
+        this.partitionDistributionResetTimeoutSupplier = 
partitionDistributionResetTimeoutSupplier;
+        this.busyLock = busyLock;
+
+        executor = createZoneManagerExecutor(
+                Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+                NamedThreadFactory.create(nodeName, "dst-zones-scheduler", LOG)
+        );
+
+        scaleUpTimerPrefixListener = createScaleUpTimerPrefixListener();
+        scaleDownTimerPrefixListener = createScaleDownTimerPrefixListener();
+        dataNodesListener = createDataNodesListener();
+    }
+
+    CompletableFuture<Void> startAsync(
+            Collection<CatalogZoneDescriptor> knownZones,
+            long recoveryRevision
+    ) {
+        metaStorageManager.registerPrefixWatch(zoneScaleUpTimerPrefix(), 
scaleUpTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneScaleDownTimerPrefix(), 
scaleDownTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneDataNodesHistoryPrefix(), 
dataNodesListener);
+
+        if (knownZones.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        Set<ByteArray> allKeys = new HashSet<>();
+        Map<Integer, CatalogZoneDescriptor> descriptors = new HashMap<>();
+
+        for (CatalogZoneDescriptor zone : knownZones) {
+            allKeys.add(zoneDataNodesHistoryKey(zone.id()));
+            allKeys.add(zoneScaleUpTimerKey(zone.id()));
+            allKeys.add(zoneScaleDownTimerKey(zone.id()));
+            allKeys.add(zonePartitionResetTimerKey(zone.id()));
+
+            descriptors.put(zone.id(), zone);
+        }
+
+        return metaStorageManager.getAll(allKeys)
+                .thenAccept(entriesMap -> {
+                    for (CatalogZoneDescriptor zone : descriptors.values()) {
+                        Entry historyEntry = 
entriesMap.get(zoneDataNodesHistoryKey(zone.id()));
+                        Entry scaleUpEntry = 
entriesMap.get(zoneScaleUpTimerKey(zone.id()));
+                        Entry scaleDownEntry = 
entriesMap.get(zoneScaleDownTimerKey(zone.id()));
+                        Entry partitionResetEntry = 
entriesMap.get(zonePartitionResetTimerKey(zone.id()));
+
+                        if (missingEntry(historyEntry)) {
+                            // Not critical because if we have no history in 
this map, we look into meta storage.
+                            LOG.warn("Couldn't recover data nodes history for 
zone [id={}, historyEntry={}].", zone.id(), historyEntry);
+
+                            continue;
+                        }
+
+                        if (missingEntry(scaleUpEntry) || 
missingEntry(scaleDownEntry) || missingEntry(partitionResetEntry)) {
+                            throw new AssertionError(format("Couldn't recover 
timers for zone [id={}, name={}, scaleUpEntry={}, "
+                                    + "scaleDownEntry={}, 
partitionResetEntry={}", zone.id(), zone.name(), scaleUpEntry,
+                                    scaleDownEntry, partitionResetEntry));
+                        }
+
+                        DataNodesHistory history = 
DataNodesHistorySerializer.deserialize(historyEntry.value());
+                        dataNodesHistoryVolatile.put(zone.id(), history);
+
+                        DistributionZoneTimer scaleUpTimer = 
DistributionZoneTimerSerializer.deserialize(scaleUpEntry.value());
+                        DistributionZoneTimer scaleDownTimer = 
DistributionZoneTimerSerializer.deserialize(scaleDownEntry.value());
+
+                        onScaleUpTimerChange(zone, scaleUpTimer);
+                        onScaleDownTimerChange(zone, scaleDownTimer);
+                        restorePartitionResetTimer(zone.id(), scaleDownTimer, 
recoveryRevision);
+                    }
+                });
+    }
+
+    private static boolean missingEntry(Entry e) {
+        return e.empty() || e.tombstone();
+    }
+
+    void stop() {
+        zoneTimers.forEach((k, zt) -> zt.stopAllTimers());
+
+        shutdownAndAwaitTermination(executor, 10, SECONDS);
+    }
+
+    /**
+     * Recalculates data nodes on topology changes, modifies scale up and 
scale down timers and appends data nodes history in meta storage.
+     * It takes the current data nodes according to timestamp, compares them 
with the new topology and calculates added nodes and
+     * removed nodes, also compares with the old topology and calculates added 
nodes comparing to the old topology (if they are not empty
+     * then the history entry will be always added).
+     * <br>
+     * Added nodes and removed nodes are added to scale up and scale down 
timers respectively. Their time to trigger is refreshed
+     * according to current zone descriptor. If their time to trigger is less 
than or equal to  the current timestamp (including the cases
+     * when auto adjust timeout is immediate), then they are applied 
automatically to the new history entry, in this cases their value
+     * in meta storage is set to {@link DistributionZoneTimer#DEFAULT_TIMER}.
+     * <br>
+     * For example:
+     * <ul>
+     *     <li>there are nodes A, B in the latest data nodes history entry, 
its timestamp is 1;</li>
+     *     <li>scale up auto adjust is 5, scale down auto adjust is 100 (let's 
say that time units for auto adjust are the same as used
+     *     for timestamps);</li>
+     *     <li>node A leaves, node C joins at timestamp 10;</li>
+     *     <li>onTopologyChange is triggered and sets scale up timer to 15 and 
scale down to 110;</li>
+     *     <li>for some reason scheduled executor hangs and doesn't trigger 
timers (let's consider case when everything is recalculated in
+     *     onTopologyChange);</li>
+     *     <li>node B leaves at time 20;</li>
+     *     <li>onTopologyChange is triggered again and sees that scale up 
timer should have been already triggered (its time to trigger
+     *     was 15), it adds node C automatically to new history entry. Scale 
down timer is scheduled and has to wait more, but
+     *     another node (B) left, so scale down timer's nodes are now A and B, 
and its time to trigger is rescheduled and is now
+     *     20 + 100 = 120. Data nodes in the new data nodes history entry are 
A, B, C.</li>
+     * </ul>
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param revision Meta storage revision.
+     * @param timestamp Timestamp, that is consistent with meta storage 
revision.
+     * @param newLogicalTopology New logical topology.
+     * @param oldLogicalTopology Old logical topology.
+     * @return CompletableFuture that is completed when the operation is done.
+     */
+    CompletableFuture<Void> onTopologyChange(
+            CatalogZoneDescriptor zoneDescriptor,
+            long revision,
+            HybridTimestamp timestamp,
+            Set<NodeWithAttributes> newLogicalTopology,
+            Set<NodeWithAttributes> oldLogicalTopology
+    ) {
+        int zoneId = zoneDescriptor.id();
+
+        return doOperation(
+                zoneDescriptor,
+                List.of(zoneDataNodesHistoryKey(zoneId), 
zoneScaleUpTimerKey(zoneId), zoneScaleDownTimerKey(zoneId)),
+                dataNodesHistoryContext -> 
completedFuture(onTopologyChangeInternal(
+                        zoneDescriptor,
+                        revision,
+                        timestamp,
+                        newLogicalTopology,
+                        oldLogicalTopology,
+                        dataNodesHistoryContext
+                ))
+        );
+    }
+
+    @Nullable
+    private DataNodesHistoryMetaStorageOperation onTopologyChangeInternal(
+            CatalogZoneDescriptor zoneDescriptor,
+            long revision,
+            HybridTimestamp timestamp,
+            Set<NodeWithAttributes> newLogicalTopology,
+            Set<NodeWithAttributes> oldLogicalTopology,
+            @Nullable DistributionZonesUtil.DataNodesHistoryContext 
dataNodesHistoryContext
+    ) {
+        if (dataNodesHistoryContext == null) {
+            // This means that the zone was not initialized yet. The initial 
history entry with current topology will
+            // be written on the zone init.
+            return null;
+        }
+
+        DataNodesHistory dataNodesHistory = 
dataNodesHistoryContext.dataNodesHistory();
+
+        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
+            // This event was already processed by another node.
+            return null;
+        }
+
+        int zoneId = zoneDescriptor.id();
+
+        LOG.debug("Topology change detected [zoneId={}, timestamp={}, 
newTopology={}, oldTopology={}].", zoneId, timestamp,
+                nodeNames(newLogicalTopology), nodeNames(oldLogicalTopology));
+
+        DistributionZoneTimer scaleUpTimer = 
dataNodesHistoryContext.scaleUpTimer();
+        DistributionZoneTimer scaleDownTimer = 
dataNodesHistoryContext.scaleDownTimer();
+
+        DataNodesHistoryEntry latestDataNodes = 
dataNodesHistory.dataNodesForTimestamp(timestamp);
+
+        Set<NodeWithAttributes> addedNodes = newLogicalTopology.stream()
+                .filter(node -> !latestDataNodes.dataNodes().contains(node))
+                .collect(toSet());
+
+        Set<NodeWithAttributes> addedNodesComparingToOldTopology = 
newLogicalTopology.stream()
+                .filter(node -> !oldLogicalTopology.contains(node))
+                .collect(toSet());
+
+        Set<NodeWithAttributes> removedNodes = 
latestDataNodes.dataNodes().stream()
+                .filter(node -> !newLogicalTopology.contains(node) && 
!Objects.equals(node.nodeName(), localNodeName))
+                .filter(node -> !scaleDownTimer.nodes().contains(node))
+                .collect(toSet());
+
+        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty())
+                && zoneDescriptor.dataNodesAutoAdjust() != 
INFINITE_TIMER_VALUE) {
+            // TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust 
timer.
+            throw new UnsupportedOperationException("Data nodes auto adjust is 
not supported.");
+        }
+
+        int partitionResetDelay = 
partitionDistributionResetTimeoutSupplier.getAsInt();
+
+        if (!removedNodes.isEmpty()
+                && zoneDescriptor.consistencyMode() == HIGH_AVAILABILITY
+                && partitionResetDelay != INFINITE_TIMER_VALUE) {
+            reschedulePartitionReset(partitionResetDelay, () -> 
partitionResetClosure.accept(revision, zoneId), zoneId);
+        }
+
+        DistributionZoneTimer mergedScaleUpTimer = 
mergeTimerOnTopologyChange(zoneDescriptor, timestamp,
+                scaleUpTimer, addedNodes, newLogicalTopology, true);
+
+        DistributionZoneTimer mergedScaleDownTimer = 
mergeTimerOnTopologyChange(zoneDescriptor, timestamp,
+                scaleDownTimer, removedNodes, newLogicalTopology, false);
+
+        DataNodesHistoryEntry currentDataNodes = currentDataNodes(
+                timestamp,
+                dataNodesHistory,
+                mergedScaleUpTimer,
+                mergedScaleDownTimer,
+                zoneDescriptor
+        );
+
+        DistributionZoneTimer scaleUpTimerToSave = timerToSave(timestamp, 
mergedScaleUpTimer);
+        DistributionZoneTimer scaleDownTimerToSave = timerToSave(timestamp, 
mergedScaleDownTimer);
+
+        boolean addMandatoryEntry = 
!addedNodesComparingToOldTopology.isEmpty();
+
+        Condition condition = and(
+                dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory),
+                and(
+                        timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), 
scaleUpTimer),
+                        timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), 
scaleDownTimer)
+                )
+        );
+
+        Operations operations = ops(
+                addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, 
currentDataNodes.timestamp(),
+                        currentDataNodes.dataNodes(), addMandatoryEntry),
+                renewTimer(zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave),
+                renewTimer(zoneScaleDownTimerKey(zoneId), scaleDownTimerToSave)
+        );
+
+        return DataNodesHistoryMetaStorageOperation.builder()
+                .zoneId(zoneId)
+                .condition(condition)
+                .operations(operations)
+                .operationName("topology change")
+                .currentDataNodesHistory(dataNodesHistory)
+                .currentTimestamp(timestamp)
+                .historyEntryTimestamp(currentDataNodes.timestamp())
+                .historyEntryNodes(currentDataNodes.dataNodes())
+                .scaleUpTimer(scaleUpTimerToSave)
+                .scaleDownTimer(scaleDownTimerToSave)
+                .addMandatoryEntry(addMandatoryEntry)
+                .build();
+    }
+
+    /**
+     * This is called on topology change, when some nodes may be added or 
removed, and therefore scale up or scale down timers can be
+     * created. By this moment there can be already some timers in meta 
storage, so we need to merge them with new topology changes, that
+     * is done by this method. The create time of the new timer is set to 
current timestamp, and sets of nodes in two timers are unified,
+     * with consideration of the logical topology: if the node is absent in 
topology, it is excluded from scale up timer, and if it is
+     * present, it is excluded from scale down timer.
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param timestamp Current timestamp.
+     * @param currentTimer Current timer, that is taken from meta storage.
+     * @param nodes Set of nodes that are added or removed, depending on the 
timer, this is defined by {@code scaleUp} parameter.
+     * @param logicalTopology Current logical topology.
+     * @param scaleUp If true, the timer is scale up, otherwise it is scale 
down.
+     * @return Merged timer.
+     */
+    private static DistributionZoneTimer mergeTimerOnTopologyChange(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            DistributionZoneTimer currentTimer,
+            Set<NodeWithAttributes> nodes,
+            Set<NodeWithAttributes> logicalTopology,
+            boolean scaleUp
+    ) {
+        // Filter the current timer's nodes according to the current topology, 
if it is newer than the timer's timestamp.
+        Set<NodeWithAttributes> currentTimerFilteredNodes = 
currentTimer.nodes().stream()
+                .filter(n -> {
+                    if (currentTimer.createTimestamp().longValue() >= 
timestamp.longValue()) {
+                        return true;
+                    } else {
+                        return scaleUp == 
nodeNames(logicalTopology).contains(n.nodeName());
+                    }
+                })
+                .collect(toSet());
+
+        if (nodes.isEmpty()) {
+            return new DistributionZoneTimer(
+                    currentTimer.createTimestamp(),
+                    currentTimer.timeToWaitInSeconds(),
+                    currentTimerFilteredNodes
+            );
+        } else {
+            int autoAdjustWaitInSeconds = scaleUp
+                    ? zoneDescriptor.dataNodesAutoAdjustScaleUp()
+                    : zoneDescriptor.dataNodesAutoAdjustScaleDown();
+
+            return new DistributionZoneTimer(
+                    timestamp,
+                    autoAdjustWaitInSeconds,
+                    union(nodes, currentTimerFilteredNodes)
+            );
+        }
+    }
+
+    /**
+     * Returns timer value to save in the meta storage. If the timer is 
already applied according to current timestamp,
+     * returns default timer.
+     *
+     * @param currentTimestamp Current timestamp.
+     * @param timer Timer.
+     * @return Timer to save.
+     */
+    private static DistributionZoneTimer timerToSave(HybridTimestamp 
currentTimestamp, DistributionZoneTimer timer) {
+        return timer.timeToTrigger().longValue() <= 
currentTimestamp.longValue() ? DEFAULT_TIMER : timer;
+    }
+
+    /**
+     * Recalculates data nodes on zone filter changes according only to 
current topology and new filters. Stops and discards all timers.
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param timestamp Current timestamp.
+     * @param logicalTopology New logical topology.
+     * @return CompletableFuture that is completed when the operation is done.
+     */
+    CompletableFuture<Void> onZoneFilterChange(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            Set<NodeWithAttributes> logicalTopology
+    ) {
+        int zoneId = zoneDescriptor.id();
+
+        return doOperation(
+                zoneDescriptor,
+                List.of(zoneDataNodesHistoryKey(zoneId)),
+                dataNodesHistoryContext -> 
completedFuture(onZoneFilterChangeInternal(
+                        zoneDescriptor,
+                        timestamp,
+                        logicalTopology,
+                        dataNodesHistoryContext
+                ))
+        );
+    }
+
+    @Nullable
+    private DataNodesHistoryMetaStorageOperation onZoneFilterChangeInternal(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            Set<NodeWithAttributes> logicalTopology,
+            DataNodesHistoryContext dataNodesHistoryContext
+    ) {
+        assert dataNodesHistoryContext != null : "Data nodes history and 
timers are missing, zone=" + zoneDescriptor;
+
+        DataNodesHistory dataNodesHistory = 
dataNodesHistoryContext.dataNodesHistory();
+
+        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
+            return null;
+        }
+
+        int zoneId = zoneDescriptor.id();
+
+        LOG.debug("Distribution zone filter changed [zoneId={}, timestamp={}, 
logicalTopology={}, descriptor={}].", zoneId,
+                timestamp, nodeNames(logicalTopology), zoneDescriptor);
+
+        stopAllTimers(zoneId);
+
+        Set<NodeWithAttributes> dataNodes = filterDataNodes(logicalTopology, 
zoneDescriptor);
+
+        return DataNodesHistoryMetaStorageOperation.builder()
+                .zoneId(zoneId)
+                .condition(dataNodesHistoryEqualToOrNotExists(zoneId, 
dataNodesHistory))
+                .operations(ops(
+                        addNewEntryToDataNodesHistory(zoneId, 
dataNodesHistory, timestamp, dataNodes),
+                        clearTimer(zoneScaleUpTimerKey(zoneId)),
+                        clearTimer(zoneScaleDownTimerKey(zoneId)),
+                        clearTimer(zonePartitionResetTimerKey(zoneId))
+                ))
+                .operationName("distribution zone filter change")
+                .currentDataNodesHistory(dataNodesHistory)
+                .currentTimestamp(timestamp)
+                .historyEntryTimestamp(timestamp)
+                .historyEntryNodes(dataNodes)
+                .scaleUpTimer(DEFAULT_TIMER)
+                .scaleDownTimer(DEFAULT_TIMER)
+                .build();
+    }
+
+    /**
+     * Recalculates data nodes on zone auto adjust alterations. Modifies scale 
up and scale down timers and appends data nodes history.
+     * The new data nodes are calculated according to the new timer values. 
See also description of {@link #onTopologyChange} method
+     * for more details and examples of how data nodes are recalculated.
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param timestamp Current timestamp.
+     * @param oldAutoAdjustScaleUp Old scale up auto adjust value.
+     * @param oldAutoAdjustScaleDown Old scale down auto adjust value.
+     * @return CompletableFuture that is completed when the operation is done.
+     */
+    CompletableFuture<Void> onAutoAdjustAlteration(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            int oldAutoAdjustScaleUp,
+            int oldAutoAdjustScaleDown
+    ) {
+        int zoneId = zoneDescriptor.id();
+
+        return doOperation(
+                zoneDescriptor,
+                List.of(zoneDataNodesHistoryKey(zoneId), 
zoneScaleUpTimerKey(zoneId), zoneScaleDownTimerKey(zoneId)),
+                dataNodesHistoryContext -> 
completedFuture(onAutoAdjustAlterationInternal(
+                        zoneDescriptor,
+                        timestamp,
+                        oldAutoAdjustScaleUp,
+                        oldAutoAdjustScaleDown,
+                        dataNodesHistoryContext
+                ))
+        );
+    }
+
+    @Nullable
+    private DataNodesHistoryMetaStorageOperation 
onAutoAdjustAlterationInternal(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            int oldAutoAdjustScaleUp,
+            int oldAutoAdjustScaleDown,
+            DataNodesHistoryContext dataNodesHistoryContext
+    ) {
+        assert dataNodesHistoryContext != null : "Data nodes history and 
timers are missing, zone=" + zoneDescriptor;
+
+        DataNodesHistory dataNodesHistory = 
dataNodesHistoryContext.dataNodesHistory();
+
+        if (dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
+            return null;
+        }
+
+        int zoneId = zoneDescriptor.id();
+
+        LOG.debug("Distribution zone auto adjust changed [zoneId={}, 
timestamp={}, oldAutoAdjustScaleUp={}, "
+                        + ", oldAutoAdjustScaleDown={}, descriptor={}].",
+                zoneId, timestamp, oldAutoAdjustScaleUp, 
oldAutoAdjustScaleDown, zoneDescriptor);
+
+        DistributionZoneTimer scaleUpTimer = 
dataNodesHistoryContext.scaleUpTimer();
+        DistributionZoneTimer scaleDownTimer = 
dataNodesHistoryContext.scaleDownTimer();
+
+        DistributionZoneTimer modifiedScaleUpTimer = scaleUpTimer
+                .modifyTimeToWait(zoneDescriptor.dataNodesAutoAdjustScaleUp());
+
+        DistributionZoneTimer modifiedScaleDownTimer = scaleDownTimer
+                
.modifyTimeToWait(zoneDescriptor.dataNodesAutoAdjustScaleDown());
+
+        DataNodesHistoryEntry currentDataNodes = currentDataNodes(
+                timestamp,
+                dataNodesHistory,
+                modifiedScaleUpTimer,
+                modifiedScaleDownTimer,
+                zoneDescriptor
+        );
+
+        DistributionZoneTimer scaleUpTimerToSave = timerToSave(timestamp, 
modifiedScaleUpTimer);
+        DistributionZoneTimer scaleDownTimerToSave = timerToSave(timestamp, 
modifiedScaleDownTimer);
+
+        Condition condition = and(
+                dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory),
+                and(
+                        timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), 
scaleUpTimer),
+                        timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), 
scaleDownTimer)
+                )
+        );
+
+        Operations operations = ops(
+                addNewEntryToDataNodesHistory(zoneId, dataNodesHistory, 
currentDataNodes.timestamp(),
+                        currentDataNodes.dataNodes()),
+                renewTimer(zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave),
+                renewTimer(zoneScaleDownTimerKey(zoneId), scaleDownTimerToSave)
+        );
+
+        return DataNodesHistoryMetaStorageOperation.builder()
+                .zoneId(zoneId)
+                .condition(condition)
+                .operations(operations)
+                .operationName("distribution zone auto adjust change")
+                .currentDataNodesHistory(dataNodesHistory)
+                .currentTimestamp(timestamp)
+                .historyEntryTimestamp(currentDataNodes.timestamp())
+                .historyEntryNodes(currentDataNodes.dataNodes())
+                .scaleUpTimer(scaleUpTimerToSave)
+                .scaleDownTimer(scaleDownTimerToSave)
+                .build();
+    }
+
+    void onUpdatePartitionDistributionReset(
+            int zoneId,
+            int partitionDistributionResetTimeoutSeconds,
+            Runnable taskOnReset
+    ) {
+        if (partitionDistributionResetTimeoutSeconds == INFINITE_TIMER_VALUE) {
+            zoneTimers.computeIfAbsent(zoneId, 
this::createZoneTimers).partitionReset.stopScheduledTask();
+        } else {
+            zoneTimers.computeIfAbsent(zoneId, 
this::createZoneTimers).partitionReset
+                    .reschedule(partitionDistributionResetTimeoutSeconds, 
taskOnReset);
+        }
+    }
+
+    /**
+     * The closure that is executed on scale up or scale down schedule. 
Appends data nodes history in meta storage and sets the timer
+     * to default value.
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param scheduledTimer Scheduled timer.
+     * @return Runnable that is executed on schedule.
+     */
+    private Runnable applyTimerClosure(CatalogZoneDescriptor zoneDescriptor, 
ScheduledTimer scheduledTimer) {
+        int zoneId = zoneDescriptor.id();
+
+        return () -> doOperation(
+                zoneDescriptor,
+                List.of(zoneDataNodesHistoryKey(zoneId), 
scheduledTimer.metaStorageKey()),
+                dataNodesHistoryContext -> applyTimerClosure0(zoneDescriptor, 
scheduledTimer, dataNodesHistoryContext)
+        );
+    }
+
+    @Nullable

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to