zentol commented on a change in pull request #14852:
URL: https://github.com/apache/flink/pull/14852#discussion_r570108832



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * State which describes that the scheduler is waiting for resources in order 
to execute the job.
+ */
+class WaitingForResources implements State, ResourceConsumer {
+
+    private final Context context;
+
+    private final Logger logger;

Review comment:
       ```suggestion
       private final Logger log;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * State which describes that the scheduler is waiting for resources in order 
to execute the job.
+ */
+class WaitingForResources implements State, ResourceConsumer {
+
+    private final Context context;
+
+    private final Logger logger;
+
+    private final ResourceCounter desiredResources;
+
+    WaitingForResources(Context context, Logger logger, ResourceCounter 
desiredResources) {
+        this.context = context;
+        this.logger = logger;
+        this.desiredResources = desiredResources;
+    }
+
+    @Override
+    public void onEnter() {
+        context.runIfState(this, this::resourceTimeout, 
Duration.ofSeconds(10L));

Review comment:
       If we merge this as is please file a ticket for making the timeout 
configurable and increasing the default to a higher value.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * State which describes that the scheduler is waiting for resources in order 
to execute the job.
+ */
+class WaitingForResources implements State, ResourceConsumer {
+
+    private final Context context;
+
+    private final Logger logger;
+
+    private final ResourceCounter desiredResources;
+
+    WaitingForResources(Context context, Logger logger, ResourceCounter 
desiredResources) {
+        this.context = context;
+        this.logger = logger;
+        this.desiredResources = desiredResources;
+    }
+
+    @Override
+    public void onEnter() {
+        context.runIfState(this, this::resourceTimeout, 
Duration.ofSeconds(10L));
+        notifyNewResourcesAvailable();
+    }
+
+    @Override
+    public void cancel() {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, 
null));
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, 
cause));
+    }
+
+    @Override
+    public JobStatus getJobStatus() {
+        return JobStatus.INITIALIZING;
+    }
+
+    @Override
+    public ArchivedExecutionGraph getJob() {
+        return context.getArchivedExecutionGraph(getJobStatus(), null);
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, 
cause));
+    }
+
+    @Override
+    public Logger getLogger() {
+        return logger;
+    }
+
+    @Override
+    public void notifyNewResourcesAvailable() {
+        if (context.hasEnoughResources(desiredResources)) {
+            createExecutionGraphWithAvailableResources();
+        }
+    }
+
+    private void resourceTimeout() {
+        createExecutionGraphWithAvailableResources();
+    }
+
+    private void createExecutionGraphWithAvailableResources() {
+        try {
+            final ExecutionGraph executionGraph =
+                    context.createExecutionGraphWithAvailableResources();
+
+            context.goToExecuting(executionGraph);
+        } catch (Exception exception) {
+            logger.error("handling initialization failure", exception);

Review comment:
       We should clean this up a bit. I only added it back then because it 
seems like this transition was never logged, but ideally the context logs all 
state transitions with appropriate the exception.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+            ctx.setCreateExecutionGraphWithAvailableResources(
+                    () -> {
+                        throw new RuntimeException("Test exception");
+                    });
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                    }));
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testNotEnoughResources() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            // we expect no state transition.
+            wfr.onEnter();
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNotifyNewResourcesAvailable() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            wfr.onEnter();
+            ctx.setExpectExecuting(assertNonNull());
+            ctx.setHasEnoughResources(() -> true); // make resources available
+            wfr.notifyNewResourcesAvailable(); // .. and notify
+        }
+    }
+
+    @Test
+    public void testResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+
+            // immediately execute all scheduled runnables
+            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
+                if (scheduledRunnable.getExpectedState() == wfr) {
+                    scheduledRunnable.runAction();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnGlobalFailure() throws Exception {
+        final String testExceptionString = "This is a test exception";
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
+                        assertTrue(
+                                archivedExecutionGraph
+                                        .getFailureInfo()
+                                        .getExceptionAsString()
+                                        .contains(testExceptionString));
+                    });
+            wfr.onEnter();
+
+            wfr.handleGlobalFailure(new RuntimeException(testExceptionString));
+        }
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.CANCELED));
+                    }));
+            wfr.onEnter();

Review comment:
       ```suggestion
               wfr.onEnter();
               ctx.setExpectFinished(
                       (archivedExecutionGraph -> {
                           assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.CANCELED));
                       }));
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {
+        this.stateName = stateName;
+    }
+
+    /**
+     * Activate this validator (if the state transition hasn't been validated, 
it will fail in the
+     * close method).
+     *
+     * @param asserter Consumer which validates the input to the state 
transition.
+     */
+    public void activate(Consumer<T> asserter) {
+        consumer = Preconditions.checkNotNull(asserter);
+        trap =
+                () -> {
+                    throw new AssertionError("no transition to " + stateName);
+                };
+    }
+
+    /**
+     * Call this method on the state transition, to register the transition, 
and validate the passed
+     * arguments.
+     *
+     * @param input Argument(s) of the state transition.
+     * @throws NullPointerException If no comsumer has been set (an unexpected 
state transition
+     *     occurred)
+     */
+    public void validateInput(T input) {
+        Preconditions.checkNotNull(consumer, "No consumer set. Unexpected 
state transition?");

Review comment:
       After we have validated the input we can also reset the consumer to 
prevent a second transition into the same state.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+            ctx.setCreateExecutionGraphWithAvailableResources(
+                    () -> {
+                        throw new RuntimeException("Test exception");
+                    });
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                    }));
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testNotEnoughResources() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            // we expect no state transition.
+            wfr.onEnter();
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNotifyNewResourcesAvailable() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            wfr.onEnter();
+            ctx.setExpectExecuting(assertNonNull());
+            ctx.setHasEnoughResources(() -> true); // make resources available
+            wfr.notifyNewResourcesAvailable(); // .. and notify
+        }
+    }
+
+    @Test
+    public void testResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+
+            // immediately execute all scheduled runnables
+            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
+                if (scheduledRunnable.getExpectedState() == wfr) {
+                    scheduledRunnable.runAction();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnGlobalFailure() throws Exception {
+        final String testExceptionString = "This is a test exception";
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
+                        assertTrue(
+                                archivedExecutionGraph
+                                        .getFailureInfo()
+                                        .getExceptionAsString()
+                                        .contains(testExceptionString));
+                    });
+            wfr.onEnter();
+
+            wfr.handleGlobalFailure(new RuntimeException(testExceptionString));
+        }
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.CANCELED));
+                    }));
+            wfr.onEnter();
+            wfr.cancel();
+        }
+    }
+
+    @Test
+    public void testSuspend() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED));
+                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
+                    }));
+            wfr.onEnter();
+            wfr.suspend(new RuntimeException("suspend"));

Review comment:
       ```suggestion
               wfr.onEnter();
               ctx.setExpectFinished(
                       (archivedExecutionGraph -> {
                           assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.SUSPENDED));
                           assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
                       }));
               wfr.suspend(new RuntimeException("suspend"));
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+            ctx.setCreateExecutionGraphWithAvailableResources(
+                    () -> {
+                        throw new RuntimeException("Test exception");
+                    });
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                    }));
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testNotEnoughResources() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            // we expect no state transition.
+            wfr.onEnter();
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNotifyNewResourcesAvailable() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            wfr.onEnter();
+            ctx.setExpectExecuting(assertNonNull());
+            ctx.setHasEnoughResources(() -> true); // make resources available
+            wfr.notifyNewResourcesAvailable(); // .. and notify
+        }
+    }
+
+    @Test
+    public void testResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();

Review comment:
       ```suggestion
               wfr.onEnter();
               ctx.setExpectExecuting(assertNonNull());
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {
+        this.stateName = stateName;
+    }
+
+    /**
+     * Activate this validator (if the state transition hasn't been validated, 
it will fail in the
+     * close method).
+     *
+     * @param asserter Consumer which validates the input to the state 
transition.
+     */
+    public void activate(Consumer<T> asserter) {
+        consumer = Preconditions.checkNotNull(asserter);
+        trap =
+                () -> {
+                    throw new AssertionError("no transition to " + stateName);
+                };
+    }
+
+    /**
+     * Call this method on the state transition, to register the transition, 
and validate the passed
+     * arguments.
+     *
+     * @param input Argument(s) of the state transition.
+     * @throws NullPointerException If no comsumer has been set (an unexpected 
state transition

Review comment:
       ```suggestion
        * @throws NullPointerException If no consumer has been set (an 
unexpected state transition
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+            ctx.setCreateExecutionGraphWithAvailableResources(
+                    () -> {
+                        throw new RuntimeException("Test exception");
+                    });
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                    }));
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testNotEnoughResources() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            // we expect no state transition.
+            wfr.onEnter();
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNotifyNewResourcesAvailable() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            wfr.onEnter();
+            ctx.setExpectExecuting(assertNonNull());
+            ctx.setHasEnoughResources(() -> true); // make resources available

Review comment:
       ```suggestion
               ctx.setHasEnoughResources(() -> true); // make resources 
available
               ctx.setExpectExecuting(assertNonNull());
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {
+        this.stateName = stateName;
+    }
+
+    /**
+     * Activate this validator (if the state transition hasn't been validated, 
it will fail in the
+     * close method).
+     *
+     * @param asserter Consumer which validates the input to the state 
transition.
+     */
+    public void activate(Consumer<T> asserter) {
+        consumer = Preconditions.checkNotNull(asserter);
+        trap =
+                () -> {
+                    throw new AssertionError("no transition to " + stateName);
+                };
+    }
+
+    /**
+     * Call this method on the state transition, to register the transition, 
and validate the passed
+     * arguments.
+     *
+     * @param input Argument(s) of the state transition.
+     * @throws NullPointerException If no comsumer has been set (an unexpected 
state transition
+     *     occurred)
+     */
+    public void validateInput(T input) {
+        Preconditions.checkNotNull(consumer, "No consumer set. Unexpected 
state transition?");

Review comment:
       we could use an assertion here; I think it conveys more that the test 
did not run as expected

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/State.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * State abstraction of the {@link DeclarativeScheduler}. This interface 
contains all methods every
+ * state implementation must support.
+ */
+interface State {

Review comment:
       note: Had shared classes such as this been added first in a separate PR 
we could review all states in isolation without having to be based on one 
another.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+            ctx.setCreateExecutionGraphWithAvailableResources(
+                    () -> {
+                        throw new RuntimeException("Test exception");
+                    });
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                    }));
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testNotEnoughResources() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            // we expect no state transition.
+            wfr.onEnter();
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNotifyNewResourcesAvailable() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            wfr.onEnter();
+            ctx.setExpectExecuting(assertNonNull());
+            ctx.setHasEnoughResources(() -> true); // make resources available
+            wfr.notifyNewResourcesAvailable(); // .. and notify
+        }
+    }
+
+    @Test
+    public void testResourceTimeout() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+
+            // immediately execute all scheduled runnables
+            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
+                if (scheduledRunnable.getExpectedState() == wfr) {
+                    scheduledRunnable.runAction();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnGlobalFailure() throws Exception {
+        final String testExceptionString = "This is a test exception";
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> false);
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
+                        assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
+                        assertTrue(
+                                archivedExecutionGraph
+                                        .getFailureInfo()
+                                        .getExceptionAsString()
+                                        .contains(testExceptionString));
+                    });
+            wfr.onEnter();

Review comment:
       ```suggestion
               wfr.onEnter();
               ctx.setExpectFinished(
                       archivedExecutionGraph -> {
                           assertThat(archivedExecutionGraph.getState(), 
is(JobStatus.FAILED));
                           assertThat(archivedExecutionGraph.getFailureInfo(), 
notNullValue());
                           assertTrue(
                                   archivedExecutionGraph
                                           .getFailureInfo()
                                           .getExceptionAsString()
                                           .contains(testExceptionString));
                       });
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);
+
+    /** WaitingForResources is transitioning to Executing if there are enough 
resources. */
+    @Test
+    public void testTransitionToExecuting() throws Exception {
+        try (MockContext ctx = new MockContext()) {
+            ctx.setHasEnoughResources(() -> true);
+
+            WaitingForResources wfr = new WaitingForResources(ctx, log, 
RESOURCE_COUNTER);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.onEnter();
+        }
+    }
+
+    @Test
+    public void testTransitionToFinishedOnFailure() throws Exception {

Review comment:
       ```suggestion
       public void 
testTransitionToFinishedOnExecutionGraphInitializationFailure() throws 
Exception {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {
+        this.stateName = stateName;
+    }
+
+    /**
+     * Activate this validator (if the state transition hasn't been validated, 
it will fail in the
+     * close method).
+     *
+     * @param asserter Consumer which validates the input to the state 
transition.
+     */
+    public void activate(Consumer<T> asserter) {

Review comment:
       `Activate` implies that the validator is inactive by default, but that 
isn't really true since `validateInput` may throw an error regardless.
   

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {

Review comment:
       have you thought about using the `State` class here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.function.Consumer;
+
+/**
+ * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to 
track if correct input
+ * has been presented and if the state transition happened.
+ *
+ * @param <T> Type of the state to validate.
+ */
+public class StateValidator<T> {
+
+    private Runnable trap = () -> {};
+    private Consumer<T> consumer = null;
+    private final String stateName;
+
+    public StateValidator(String stateName) {
+        this.stateName = stateName;
+    }
+
+    /**
+     * Activate this validator (if the state transition hasn't been validated, 
it will fail in the
+     * close method).
+     *
+     * @param asserter Consumer which validates the input to the state 
transition.
+     */
+    public void activate(Consumer<T> asserter) {
+        consumer = Preconditions.checkNotNull(asserter);
+        trap =
+                () -> {
+                    throw new AssertionError("no transition to " + stateName);
+                };
+    }
+
+    /**
+     * Call this method on the state transition, to register the transition, 
and validate the passed
+     * arguments.
+     *
+     * @param input Argument(s) of the state transition.
+     * @throws NullPointerException If no comsumer has been set (an unexpected 
state transition
+     *     occurred)
+     */
+    public void validateInput(T input) {
+        Preconditions.checkNotNull(consumer, "No consumer set. Unexpected 
state transition?");

Review comment:
       actually, a nicer overall approach would be to have the default consumer 
do this assertion. We can encapsulate the setup of this consumer in a new 
`expectNoStateTransition` method which is called in the constructor which 
should make things easier to understand.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/State.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * State abstraction of the {@link DeclarativeScheduler}. This interface 
contains all methods every
+ * state implementation must support.
+ */
+interface State {
+
+    /** This method is called whenever one transitions into this state. */
+    default void onEnter() {}
+
+    /**
+     * This method is called whenever one transitions out of this state.
+     *
+     * @param newState newState is the state into which the scheduler 
transitions
+     */
+    default void onLeave(State newState) {}

Review comment:
       I thought we no longer need this?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WaitingForResources state. */
+public class WaitingForResourcesTest extends TestLogger {
+    private static final ResourceCounter RESOURCE_COUNTER =
+            ResourceCounter.withResource(ResourceProfile.ANY, 1);

Review comment:
       ```suggestion
               ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
   ```
   ANY should only be used for describing the resources that a slot provides, 
while UNKNWON must only be used to specify requirements. This distinction is 
important because you run into tediously subtle issues if you violate this in 
production code. 😩 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to