rkhachatryan commented on code in PR #25280:
URL: https://github.com/apache/flink/pull/25280#discussion_r1818949028


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.function.Supplier;
+
+/**
+ * {@code DefaultStateTransitionManager} is a state machine which manages the 
{@link
+ * AdaptiveScheduler}'s state transitions based on the previous transition 
time and the available
+ * resources. See {@link Phase} for details on each individual phase of this 
state machine. Note: We
+ * use the term phase here to avoid confusion with the state used in the 
{@link AdaptiveScheduler}.
+ *
+ * <pre>
+ * {@link Cooldown}
+ *   |
+ *   +--> {@link Idling}
+ *   |      |
+ *   |      V
+ *   +--> {@link Stabilizing}
+ *          |
+ *          +--> {@link Stabilized} --> {@link Idling}
+ *          |      |
+ *          |      V
+ *          \--> {@link Transitioning}
+ * </pre>
+ *
+ * <p>Thread-safety: This class is not implemented in a thread-safe manner and 
relies on the fact
+ * that any method call happens within a single thread.
+ *
+ * @see Executing
+ */
+@NotThreadSafe
+public class DefaultStateTransitionManager implements StateTransitionManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStateTransitionManager.class);
+
+    private final Supplier<Temporal> clock;
+    private final StateTransitionManager.Context transitionContext;
+    private Phase phase;
+    private final List<ScheduledFuture<?>> scheduledFutures;
+
+    @VisibleForTesting final Duration cooldownTimeout;
+    @Nullable @VisibleForTesting final Duration resourceStabilizationTimeout;
+    @VisibleForTesting final Duration maxTriggerDelay;
+
+    DefaultStateTransitionManager(
+            Temporal initializationTime,
+            StateTransitionManager.Context transitionContext,
+            Duration cooldownTimeout,
+            @Nullable Duration resourceStabilizationTimeout,
+            Duration maxTriggerDelay) {
+        this(
+                initializationTime,
+                Instant::now,
+                transitionContext,
+                cooldownTimeout,
+                resourceStabilizationTimeout,
+                maxTriggerDelay);
+    }
+
+    @VisibleForTesting
+    DefaultStateTransitionManager(
+            Temporal initializationTime,
+            Supplier<Temporal> clock,
+            StateTransitionManager.Context transitionContext,
+            Duration cooldownTimeout,
+            @Nullable Duration resourceStabilizationTimeout,
+            Duration maxTriggerDelay) {
+
+        this.clock = clock;
+        this.maxTriggerDelay = maxTriggerDelay;
+        this.cooldownTimeout = cooldownTimeout;
+        this.resourceStabilizationTimeout = resourceStabilizationTimeout;
+        this.transitionContext = transitionContext;
+        this.scheduledFutures = new ArrayList<>();
+        this.phase = new Cooldown(initializationTime, clock, this, 
cooldownTimeout);
+    }
+
+    @Override
+    public void onChange() {
+        phase.onChange();
+    }
+
+    @Override
+    public void onTrigger() {
+        phase.onTrigger();
+    }
+
+    @Override
+    public void close() {
+        scheduledFutures.forEach(future -> future.cancel(true));
+        scheduledFutures.clear();
+    }
+
+    @VisibleForTesting
+    Phase getPhase() {
+        return phase;
+    }
+
+    private void progressToIdling() {
+        progressToPhase(new Idling(clock, this));
+    }
+
+    private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
+        progressToPhase(
+                new Stabilizing(
+                        clock,
+                        this,
+                        resourceStabilizationTimeout,
+                        firstChangeEventTimestamp,
+                        maxTriggerDelay));
+    }
+
+    private void progressToStabilized(Temporal firstChangeEventTimestamp) {
+        progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, 
maxTriggerDelay));
+    }
+
+    private void triggerTransitionToSubsequentState() {
+        progressToPhase(new Transitioning(clock, this));
+        transitionContext.transitionToSubsequentState();
+    }
+
+    private void progressToPhase(Phase newPhase) {
+        Preconditions.checkState(
+                !(phase instanceof Transitioning),
+                "The state transition operation has already been triggered.");
+        LOG.debug("Transitioning from {} to {}.", phase, newPhase);

Review Comment:
   This is logged on DEBUG (before it was INFO IIRC)
   ```
   Transitioning from 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager$Stabilizing@79930ca7
 to 
org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager$Stabilized@4db16ef7.
   ```
   without job ID (neither in log message ~nor in MDC~).
   I think this is a regression



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to