denis-chudov commented on code in PR #5092:
URL: https://github.com/apache/ignite-3/pull/5092#discussion_r1960134658


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java:
##########
@@ -0,0 +1,1492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.lang.Math.max;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.catalog.descriptors.ConsistencyMode.HIGH_AVAILABILITY;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DEFAULT_TIMER;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_DATA_NODES_HISTORY_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_DOWN_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DISTRIBUTION_ZONE_SCALE_UP_TIMER_PREFIX;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodeHistoryContextFromValues;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deserializeLogicalTopologySet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.nodeNames;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonePartitionResetTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractZoneId;
+import static org.apache.ignite.internal.lang.Pair.pair;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.notTombstone;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static org.apache.ignite.internal.util.CollectionUtils.union;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import 
org.apache.ignite.internal.distributionzones.DataNodesHistory.DataNodesHistorySerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZoneTimer.DistributionZoneTimerSerializer;
+import 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DataNodeHistoryContext;
+import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.Pair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Manager for data nodes of distribution zones.
+ * <br>
+ * It is used by {@link DistributionZoneManager} to calculate data nodes, see 
{@link #dataNodes(int, HybridTimestamp)}.
+ * Data nodes are stored in meta storage as {@link DataNodesHistory} for each 
zone, also for each zone there are scale up and
+ * scale down timer, stored as {@link DistributionZoneTimer}. Partition reset 
timer is calculated on the node recovery according
+ * to the scale down timer state, see {@link #restorePartitionResetTimer(int, 
DistributionZoneTimer, long)}.
+ * <br>
+ * Data nodes history is appended on topology changes, on zone filter changes 
and on zone auto adjust alterations (i.e. alterations of
+ * scale up and scale down timer configuration), see {@link 
#onTopologyChange}, {@link #onZoneFilterChange} and
+ * {@link #onAutoAdjustAlteration} methods respectively.
+ */
+public class DataNodesManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(DataNodesManager.class);
+
+    private static final int MAX_ATTEMPTS_ON_RETRY = 100;
+
+    private final MetaStorageManager metaStorageManager;
+
+    private final CatalogManager catalogManager;
+
+    private final ClockService clockService;
+
+    /** External busy lock. */
+    private final IgniteSpinBusyLock busyLock;
+
+    /**
+     * Map with zone id as a key and set of zone timers as a value.
+     */
+    private final Map<Integer, ZoneTimers> zoneTimers  = new 
ConcurrentHashMap<>();
+
+    /** Executor for scheduling tasks for scale up and scale down processes. */
+    private final StripedScheduledThreadPoolExecutor executor;
+
+    private final String localNodeName;
+
+    private final WatchListener scaleUpTimerPrefixListener;
+
+    private final WatchListener scaleDownTimerPrefixListener;
+
+    private final WatchListener dataNodesListener;
+
+    private final Map<Integer, DataNodesHistory> dataNodesHistoryVolatile = 
new ConcurrentHashMap<>();
+
+    private final BiConsumer<Long, Integer> partitionResetClosure;
+
+    private final IntSupplier partitionDistributionResetTimeoutSupplier;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Local node name.
+     * @param busyLock External busy lock.
+     * @param metaStorageManager Meta storage manager.
+     * @param catalogManager Catalog manager.
+     * @param clockService Clock service.
+     * @param partitionResetClosure Closure to reset partitions.
+     * @param partitionDistributionResetTimeoutSupplier Supplier for partition 
distribution reset timeout.
+     */
+    public DataNodesManager(
+            String nodeName,
+            IgniteSpinBusyLock busyLock,
+            MetaStorageManager metaStorageManager,
+            CatalogManager catalogManager,
+            ClockService clockService,
+            BiConsumer<Long, Integer> partitionResetClosure,
+            IntSupplier partitionDistributionResetTimeoutSupplier
+    ) {
+        this.metaStorageManager = metaStorageManager;
+        this.catalogManager = catalogManager;
+        this.clockService = clockService;
+        this.localNodeName = nodeName;
+        this.partitionResetClosure = partitionResetClosure;
+        this.partitionDistributionResetTimeoutSupplier = 
partitionDistributionResetTimeoutSupplier;
+        this.busyLock = busyLock;
+
+        executor = createZoneManagerExecutor(
+                Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
+                NamedThreadFactory.create(nodeName, "dst-zones-scheduler", LOG)
+        );
+
+        scaleUpTimerPrefixListener = createScaleUpTimerPrefixListener();
+        scaleDownTimerPrefixListener = createScaleDownTimerPrefixListener();
+        dataNodesListener = createDataNodesListener();
+    }
+
+    CompletableFuture<Void> startAsync(
+            Collection<CatalogZoneDescriptor> knownZones,
+            long recoveryRevision
+    ) {
+        metaStorageManager.registerPrefixWatch(zoneScaleUpTimerPrefix(), 
scaleUpTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneScaleDownTimerPrefix(), 
scaleDownTimerPrefixListener);
+        metaStorageManager.registerPrefixWatch(zoneDataNodesHistoryPrefix(), 
dataNodesListener);
+
+        if (knownZones.isEmpty()) {
+            return nullCompletedFuture();
+        }
+
+        Set<ByteArray> allKeys = new HashSet<>();
+        Map<Integer, CatalogZoneDescriptor> descriptors = new HashMap<>();
+
+        for (CatalogZoneDescriptor zone : knownZones) {
+            List<ByteArray> zoneKeys = List.of(
+                    zoneDataNodesHistoryKey(zone.id()),
+                    zoneScaleUpTimerKey(zone.id()),
+                    zoneScaleDownTimerKey(zone.id()),
+                    zonePartitionResetTimerKey(zone.id())
+            );
+
+            allKeys.addAll(zoneKeys);
+            descriptors.put(zone.id(), zone);
+        }
+
+        return metaStorageManager.getAll(allKeys)
+                .thenApply(entriesMap -> {
+                    for (CatalogZoneDescriptor zone : descriptors.values()) {
+                        Entry historyEntry = 
entriesMap.get(zoneDataNodesHistoryKey(zone.id()));
+                        Entry scaleUpEntry = 
entriesMap.get(zoneScaleUpTimerKey(zone.id()));
+                        Entry scaleDownEntry = 
entriesMap.get(zoneScaleDownTimerKey(zone.id()));
+                        Entry partitionResetEntry = 
entriesMap.get(zonePartitionResetTimerKey(zone.id()));
+
+                        if (missingEntry(historyEntry)) {
+                            // Not critical because if we have no history in 
this map, we look into meta storage.
+                            LOG.warn("Couldn't recover data nodes history for 
zone [id={}, historyEntry={}].", zone.id(), historyEntry);
+
+                            continue;
+                        }
+
+                        if (missingEntry(scaleUpEntry) || 
missingEntry(scaleDownEntry) || missingEntry(partitionResetEntry)) {
+                            LOG.error("Couldn't recover timers for zone 
[id={}, name={}, scaleUpEntry={}, scaleDownEntry={}, "
+                                    + "partitionResetEntry={}", zone.id(), 
zone.name(), scaleUpEntry, scaleDownEntry, partitionResetEntry);
+
+                            continue;
+                        }
+
+                        DataNodesHistory history = 
DataNodesHistorySerializer.deserialize(historyEntry.value());
+                        dataNodesHistoryVolatile.put(zone.id(), history);
+
+                        DistributionZoneTimer scaleUpTimer = 
DistributionZoneTimerSerializer.deserialize(scaleUpEntry.value());
+                        DistributionZoneTimer scaleDownTimer = 
DistributionZoneTimerSerializer.deserialize(scaleDownEntry.value());
+
+                        onScaleUpTimerChange(zone, scaleUpTimer);
+                        onScaleDownTimerChange(zone, scaleDownTimer);
+                        restorePartitionResetTimer(zone.id(), scaleDownTimer, 
recoveryRevision);
+                    }
+
+                    return null;
+                });
+    }
+
+    private static boolean missingEntry(Entry e) {
+        return e.empty() || e.tombstone();
+    }
+
+    void stop() {
+        zoneTimers.forEach((k, zt) -> zt.stopAllTimers());
+
+        shutdownAndAwaitTermination(executor, 10, SECONDS);
+    }
+
+    /**
+     * Recalculates data nodes on topology changes, modifies scale up and 
scale down timers and appends data nodes history in meta storage.
+     * It takes the current data nodes according to timestamp, compares them 
with the new topology and calculates added nodes and
+     * removed nodes, also compares with the old topology and calculates added 
nodes comparing to the old topology (if they are not empty
+     * then the history entry will be always added).
+     * <br>
+     * Added nodes and removed nodes are added to scale up and scale down 
timers respectively. Their time to trigger is refreshed
+     * according to current zone descriptor. If their time to trigger is less 
than or equal to  the current timestamp (including the cases
+     * when auto adjust timeout is immediate), then they are applied 
automatically to the new history entry, in this cases their value
+     * in meta storage is set to {@link DistributionZoneTimer#DEFAULT_TIMER}.
+     * <br>
+     * For example:
+     * <ul>
+     *     <li>there are nodes A, B in the latest data nodes history entry, 
its timestamp is 1;</li>
+     *     <li>scale up auto adjust is 5, scale down auto adjust is 100 (let's 
say that time units for auto adjust are the same as used
+     *     for timestamps);</li>
+     *     <li>node A leaves, node C joins at timestamp 10;</li>
+     *     <li>onTopologyChange is triggered and sets scale up timer to 15 and 
scale down to 110;</li>
+     *     <li>for some reason scheduled executor hangs and doesn't trigger 
timers (let's consider case when everything is recalculated in
+     *     onTopologyChange);</li>
+     *     <li>node B leaves at time 20;</li>
+     *     <li>onTopologyChange is triggered again and sees that scale up 
timer should have been already triggered (its time to trigger
+     *     was 15), it adds node C automatically to new history entry. Scale 
down timer is scheduled and has to wait more, but
+     *     another node (B) left, so scale down timer's nodes are now A and B, 
and its time to trigger is rescheduled and is now
+     *     20 + 100 = 120. Data nodes in the new data nodes history entry are 
A, B, C.</li>
+     * </ul>
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param revision Meta storage revision.
+     * @param timestamp Timestamp, that is consistent with meta storage 
revision.
+     * @param newLogicalTopology New logical topology.
+     * @param oldLogicalTopology Old logical topology.
+     * @return CompletableFuture that is completed when the operation is done.
+     */
+    CompletableFuture<Void> onTopologyChange(
+            CatalogZoneDescriptor zoneDescriptor,
+            long revision,
+            HybridTimestamp timestamp,
+            Set<NodeWithAttributes> newLogicalTopology,
+            Set<NodeWithAttributes> oldLogicalTopology
+    ) {
+        return msInvokeWithRetry(msGetter -> {
+            int zoneId = zoneDescriptor.id();
+
+            return msGetter.get(List.of(zoneDataNodesHistoryKey(zoneId), 
zoneScaleUpTimerKey(zoneId), zoneScaleDownTimerKey(zoneId)))
+                    .thenApply(dataNodeHistoryContext -> {
+                        if (dataNodeHistoryContext == null) {
+                            // This means that the zone was not initialized 
yet. The initial history entry with current topology will
+                            // be written on the zone init.
+                            return null;
+                        }
+
+                        DataNodesHistory dataNodesHistory = 
dataNodeHistoryContext.dataNodesHistory();
+
+                        if 
(dataNodesHistory.entryIsPresentAtExactTimestamp(timestamp)) {
+                            return null;
+                        }
+
+                        LOG.debug("Topology change detected [zoneId={}, 
timestamp={}, newTopology={}, oldTopology={}].", zoneId, timestamp,
+                                nodeNames(newLogicalTopology), 
nodeNames(oldLogicalTopology));
+
+                        DistributionZoneTimer scaleUpTimer = 
dataNodeHistoryContext.scaleUpTimer();
+                        DistributionZoneTimer scaleDownTimer = 
dataNodeHistoryContext.scaleDownTimer();
+
+                        Pair<HybridTimestamp, Set<NodeWithAttributes>> 
latestDataNodes = dataNodesHistory.dataNodesForTimestamp(timestamp);
+
+                        Set<NodeWithAttributes> addedNodes = 
newLogicalTopology.stream()
+                                .filter(node -> 
!latestDataNodes.getSecond().contains(node))
+                                .collect(toSet());
+
+                        Set<NodeWithAttributes> 
addedNodesComparingToOldTopology = newLogicalTopology.stream()
+                                .filter(node -> 
!oldLogicalTopology.contains(node))
+                                .collect(toSet());
+
+                        Set<NodeWithAttributes> removedNodes = 
latestDataNodes.getSecond().stream()
+                                .filter(node -> 
!newLogicalTopology.contains(node) && !Objects.equals(node.nodeName(), 
localNodeName))
+                                .filter(node -> 
!scaleDownTimer.nodes().contains(node))
+                                .collect(toSet());
+
+                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty())
+                                && zoneDescriptor.dataNodesAutoAdjust() != 
INFINITE_TIMER_VALUE) {
+                            // TODO: IGNITE-18134 Create scheduler with 
dataNodesAutoAdjust timer.
+                            throw new UnsupportedOperationException("Data 
nodes auto adjust is not supported.");
+                        }
+
+                        int partitionResetDelay = 
partitionDistributionResetTimeoutSupplier.getAsInt();
+
+                        if (!removedNodes.isEmpty()
+                                && zoneDescriptor.consistencyMode() == 
HIGH_AVAILABILITY
+                                && partitionResetDelay != 
INFINITE_TIMER_VALUE) {
+                            reschedulePartitionReset(partitionResetDelay, () 
-> partitionResetClosure.accept(revision, zoneId), zoneId);
+                        }
+
+                        DistributionZoneTimer mergedScaleUpTimer = 
mergeTimerOnTopologyChange(zoneDescriptor, timestamp,
+                                scaleUpTimer, addedNodes, newLogicalTopology, 
true);
+
+                        DistributionZoneTimer mergedScaleDownTimer = 
mergeTimerOnTopologyChange(zoneDescriptor, timestamp,
+                                scaleDownTimer, removedNodes, 
newLogicalTopology, false);
+
+                        Pair<HybridTimestamp, Set<NodeWithAttributes>> 
currentDataNodes = currentDataNodes(
+                                timestamp,
+                                dataNodesHistory,
+                                mergedScaleUpTimer,
+                                mergedScaleDownTimer,
+                                zoneDescriptor
+                        );
+
+                        DistributionZoneTimer scaleUpTimerToSave = 
timerToSave(timestamp, mergedScaleUpTimer);
+                        DistributionZoneTimer scaleDownTimerToSave = 
timerToSave(timestamp, mergedScaleDownTimer);
+
+                        boolean addMandatoryEntry = 
!addedNodesComparingToOldTopology.isEmpty();
+
+                        return new LoggedIif(iif(
+                                and(
+                                        
dataNodesHistoryEqualToOrNotExists(zoneId, dataNodesHistory),
+                                        and(
+                                                
timerEqualToOrNotExists(zoneScaleUpTimerKey(zoneId), scaleUpTimer),
+                                                
timerEqualToOrNotExists(zoneScaleDownTimerKey(zoneId), scaleDownTimer)
+                                        )
+                                ),
+                                ops(
+                                        addNewEntryToDataNodesHistory(zoneId, 
dataNodesHistory, currentDataNodes.getFirst(),
+                                                currentDataNodes.getSecond(), 
addMandatoryEntry),
+                                        
renewTimer(zoneScaleUpTimerKey(zoneId), scaleUpTimerToSave),
+                                        
renewTimer(zoneScaleDownTimerKey(zoneId), scaleDownTimerToSave)
+                                ).yield(true),
+                                ops().yield(false)
+                        ),
+                        updateHistoryLogRecord(zoneId, "topology change", 
dataNodesHistory, timestamp, currentDataNodes.getFirst(),
+                                currentDataNodes.getSecond(), 
scaleUpTimerToSave, scaleDownTimerToSave, addMandatoryEntry),
+                        "Failed to update data nodes history and timers on 
topology change [timestamp=" + timestamp + "]."
+                        );
+                    });
+        }, zoneDescriptor);
+    }
+
+    private static DistributionZoneTimer mergeTimerOnTopologyChange(
+            CatalogZoneDescriptor zoneDescriptor,
+            HybridTimestamp timestamp,
+            DistributionZoneTimer currentTimer,
+            Set<NodeWithAttributes> nodes,
+            Set<NodeWithAttributes> logicalTopology,
+            boolean scaleUp
+    ) {
+        DistributionZoneTimer timer;

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to