zentol commented on a change in pull request #14852: URL: https://github.com/apache/flink/pull/14852#discussion_r570108832
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * State which describes that the scheduler is waiting for resources in order to execute the job. + */ +class WaitingForResources implements State, ResourceConsumer { + + private final Context context; + + private final Logger logger; Review comment: ```suggestion private final Logger log; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * State which describes that the scheduler is waiting for resources in order to execute the job. + */ +class WaitingForResources implements State, ResourceConsumer { + + private final Context context; + + private final Logger logger; + + private final ResourceCounter desiredResources; + + WaitingForResources(Context context, Logger logger, ResourceCounter desiredResources) { + this.context = context; + this.logger = logger; + this.desiredResources = desiredResources; + } + + @Override + public void onEnter() { + context.runIfState(this, this::resourceTimeout, Duration.ofSeconds(10L)); Review comment: If we merge this as is please file a ticket for making the timeout configurable and increasing the default to a higher value. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResources.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * State which describes that the scheduler is waiting for resources in order to execute the job. + */ +class WaitingForResources implements State, ResourceConsumer { + + private final Context context; + + private final Logger logger; + + private final ResourceCounter desiredResources; + + WaitingForResources(Context context, Logger logger, ResourceCounter desiredResources) { + this.context = context; + this.logger = logger; + this.desiredResources = desiredResources; + } + + @Override + public void onEnter() { + context.runIfState(this, this::resourceTimeout, Duration.ofSeconds(10L)); + notifyNewResourcesAvailable(); + } + + @Override + public void cancel() { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.CANCELED, null)); + } + + @Override + public void suspend(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause)); + } + + @Override + public JobStatus getJobStatus() { + return JobStatus.INITIALIZING; + } + + @Override + public ArchivedExecutionGraph getJob() { + return context.getArchivedExecutionGraph(getJobStatus(), null); + } + + @Override + public void handleGlobalFailure(Throwable cause) { + context.goToFinished(context.getArchivedExecutionGraph(JobStatus.FAILED, cause)); + } + + @Override + public Logger getLogger() { + return logger; + } + + @Override + public void notifyNewResourcesAvailable() { + if (context.hasEnoughResources(desiredResources)) { + createExecutionGraphWithAvailableResources(); + } + } + + private void resourceTimeout() { + createExecutionGraphWithAvailableResources(); + } + + private void createExecutionGraphWithAvailableResources() { + try { + final ExecutionGraph executionGraph = + context.createExecutionGraphWithAvailableResources(); + + context.goToExecuting(executionGraph); + } catch (Exception exception) { + logger.error("handling initialization failure", exception); Review comment: We should clean this up a bit. I only added it back then because it seems like this transition was never logged, but ideally the context logs all state transitions with appropriate the exception. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + ctx.setCreateExecutionGraphWithAvailableResources( + () -> { + throw new RuntimeException("Test exception"); + }); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + })); + wfr.onEnter(); + } + } + + @Test + public void testNotEnoughResources() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + // we expect no state transition. + wfr.onEnter(); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailable() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); // initially, not enough resources + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + wfr.onEnter(); + ctx.setExpectExecuting(assertNonNull()); + ctx.setHasEnoughResources(() -> true); // make resources available + wfr.notifyNewResourcesAvailable(); // .. and notify + } + } + + @Test + public void testResourceTimeout() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + + // immediately execute all scheduled runnables + assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); + for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { + if (scheduledRunnable.getExpectedState() == wfr) { + scheduledRunnable.runAction(); + } + } + } + } + + @Test + public void testTransitionToFinishedOnGlobalFailure() throws Exception { + final String testExceptionString = "This is a test exception"; + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); + assertTrue( + archivedExecutionGraph + .getFailureInfo() + .getExceptionAsString() + .contains(testExceptionString)); + }); + wfr.onEnter(); + + wfr.handleGlobalFailure(new RuntimeException(testExceptionString)); + } + } + + @Test + public void testCancel() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED)); + })); + wfr.onEnter(); Review comment: ```suggestion wfr.onEnter(); ctx.setExpectFinished( (archivedExecutionGraph -> { assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED)); })); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { + this.stateName = stateName; + } + + /** + * Activate this validator (if the state transition hasn't been validated, it will fail in the + * close method). + * + * @param asserter Consumer which validates the input to the state transition. + */ + public void activate(Consumer<T> asserter) { + consumer = Preconditions.checkNotNull(asserter); + trap = + () -> { + throw new AssertionError("no transition to " + stateName); + }; + } + + /** + * Call this method on the state transition, to register the transition, and validate the passed + * arguments. + * + * @param input Argument(s) of the state transition. + * @throws NullPointerException If no comsumer has been set (an unexpected state transition + * occurred) + */ + public void validateInput(T input) { + Preconditions.checkNotNull(consumer, "No consumer set. Unexpected state transition?"); Review comment: After we have validated the input we can also reset the consumer to prevent a second transition into the same state. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + ctx.setCreateExecutionGraphWithAvailableResources( + () -> { + throw new RuntimeException("Test exception"); + }); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + })); + wfr.onEnter(); + } + } + + @Test + public void testNotEnoughResources() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + // we expect no state transition. + wfr.onEnter(); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailable() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); // initially, not enough resources + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + wfr.onEnter(); + ctx.setExpectExecuting(assertNonNull()); + ctx.setHasEnoughResources(() -> true); // make resources available + wfr.notifyNewResourcesAvailable(); // .. and notify + } + } + + @Test + public void testResourceTimeout() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + + // immediately execute all scheduled runnables + assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); + for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { + if (scheduledRunnable.getExpectedState() == wfr) { + scheduledRunnable.runAction(); + } + } + } + } + + @Test + public void testTransitionToFinishedOnGlobalFailure() throws Exception { + final String testExceptionString = "This is a test exception"; + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); + assertTrue( + archivedExecutionGraph + .getFailureInfo() + .getExceptionAsString() + .contains(testExceptionString)); + }); + wfr.onEnter(); + + wfr.handleGlobalFailure(new RuntimeException(testExceptionString)); + } + } + + @Test + public void testCancel() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.CANCELED)); + })); + wfr.onEnter(); + wfr.cancel(); + } + } + + @Test + public void testSuspend() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); + assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); + })); + wfr.onEnter(); + wfr.suspend(new RuntimeException("suspend")); Review comment: ```suggestion wfr.onEnter(); ctx.setExpectFinished( (archivedExecutionGraph -> { assertThat(archivedExecutionGraph.getState(), is(JobStatus.SUSPENDED)); assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); })); wfr.suspend(new RuntimeException("suspend")); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + ctx.setCreateExecutionGraphWithAvailableResources( + () -> { + throw new RuntimeException("Test exception"); + }); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + })); + wfr.onEnter(); + } + } + + @Test + public void testNotEnoughResources() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + // we expect no state transition. + wfr.onEnter(); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailable() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); // initially, not enough resources + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + wfr.onEnter(); + ctx.setExpectExecuting(assertNonNull()); + ctx.setHasEnoughResources(() -> true); // make resources available + wfr.notifyNewResourcesAvailable(); // .. and notify + } + } + + @Test + public void testResourceTimeout() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); Review comment: ```suggestion wfr.onEnter(); ctx.setExpectExecuting(assertNonNull()); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { + this.stateName = stateName; + } + + /** + * Activate this validator (if the state transition hasn't been validated, it will fail in the + * close method). + * + * @param asserter Consumer which validates the input to the state transition. + */ + public void activate(Consumer<T> asserter) { + consumer = Preconditions.checkNotNull(asserter); + trap = + () -> { + throw new AssertionError("no transition to " + stateName); + }; + } + + /** + * Call this method on the state transition, to register the transition, and validate the passed + * arguments. + * + * @param input Argument(s) of the state transition. + * @throws NullPointerException If no comsumer has been set (an unexpected state transition Review comment: ```suggestion * @throws NullPointerException If no consumer has been set (an unexpected state transition ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + ctx.setCreateExecutionGraphWithAvailableResources( + () -> { + throw new RuntimeException("Test exception"); + }); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + })); + wfr.onEnter(); + } + } + + @Test + public void testNotEnoughResources() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + // we expect no state transition. + wfr.onEnter(); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailable() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); // initially, not enough resources + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + wfr.onEnter(); + ctx.setExpectExecuting(assertNonNull()); + ctx.setHasEnoughResources(() -> true); // make resources available Review comment: ```suggestion ctx.setHasEnoughResources(() -> true); // make resources available ctx.setExpectExecuting(assertNonNull()); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { + this.stateName = stateName; + } + + /** + * Activate this validator (if the state transition hasn't been validated, it will fail in the + * close method). + * + * @param asserter Consumer which validates the input to the state transition. + */ + public void activate(Consumer<T> asserter) { + consumer = Preconditions.checkNotNull(asserter); + trap = + () -> { + throw new AssertionError("no transition to " + stateName); + }; + } + + /** + * Call this method on the state transition, to register the transition, and validate the passed + * arguments. + * + * @param input Argument(s) of the state transition. + * @throws NullPointerException If no comsumer has been set (an unexpected state transition + * occurred) + */ + public void validateInput(T input) { + Preconditions.checkNotNull(consumer, "No consumer set. Unexpected state transition?"); Review comment: we could use an assertion here; I think it conveys more that the test did not run as expected ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/State.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; + +import java.util.Optional; + +/** + * State abstraction of the {@link DeclarativeScheduler}. This interface contains all methods every + * state implementation must support. + */ +interface State { Review comment: note: Had shared classes such as this been added first in a separate PR we could review all states in isolation without having to be based on one another. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + ctx.setCreateExecutionGraphWithAvailableResources( + () -> { + throw new RuntimeException("Test exception"); + }); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectFinished( + (archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + })); + wfr.onEnter(); + } + } + + @Test + public void testNotEnoughResources() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + // we expect no state transition. + wfr.onEnter(); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNotifyNewResourcesAvailable() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); // initially, not enough resources + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + wfr.onEnter(); + ctx.setExpectExecuting(assertNonNull()); + ctx.setHasEnoughResources(() -> true); // make resources available + wfr.notifyNewResourcesAvailable(); // .. and notify + } + } + + @Test + public void testResourceTimeout() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + + // immediately execute all scheduled runnables + assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); + for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { + if (scheduledRunnable.getExpectedState() == wfr) { + scheduledRunnable.runAction(); + } + } + } + } + + @Test + public void testTransitionToFinishedOnGlobalFailure() throws Exception { + final String testExceptionString = "This is a test exception"; + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> false); + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); + assertTrue( + archivedExecutionGraph + .getFailureInfo() + .getExceptionAsString() + .contains(testExceptionString)); + }); + wfr.onEnter(); Review comment: ```suggestion wfr.onEnter(); ctx.setExpectFinished( archivedExecutionGraph -> { assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); assertThat(archivedExecutionGraph.getFailureInfo(), notNullValue()); assertTrue( archivedExecutionGraph .getFailureInfo() .getExceptionAsString() .contains(testExceptionString)); }); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); + + /** WaitingForResources is transitioning to Executing if there are enough resources. */ + @Test + public void testTransitionToExecuting() throws Exception { + try (MockContext ctx = new MockContext()) { + ctx.setHasEnoughResources(() -> true); + + WaitingForResources wfr = new WaitingForResources(ctx, log, RESOURCE_COUNTER); + ctx.setExpectExecuting(assertNonNull()); + wfr.onEnter(); + } + } + + @Test + public void testTransitionToFinishedOnFailure() throws Exception { Review comment: ```suggestion public void testTransitionToFinishedOnExecutionGraphInitializationFailure() throws Exception { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { + this.stateName = stateName; + } + + /** + * Activate this validator (if the state transition hasn't been validated, it will fail in the + * close method). + * + * @param asserter Consumer which validates the input to the state transition. + */ + public void activate(Consumer<T> asserter) { Review comment: `Activate` implies that the validator is inactive by default, but that isn't really true since `validateInput` may throw an error regardless. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { Review comment: have you thought about using the `State` class here? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/StateValidator.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.util.Preconditions; + +import java.util.function.Consumer; + +/** + * Utility for state test classes (e.g. {@link WaitingForResourcesTest}) to track if correct input + * has been presented and if the state transition happened. + * + * @param <T> Type of the state to validate. + */ +public class StateValidator<T> { + + private Runnable trap = () -> {}; + private Consumer<T> consumer = null; + private final String stateName; + + public StateValidator(String stateName) { + this.stateName = stateName; + } + + /** + * Activate this validator (if the state transition hasn't been validated, it will fail in the + * close method). + * + * @param asserter Consumer which validates the input to the state transition. + */ + public void activate(Consumer<T> asserter) { + consumer = Preconditions.checkNotNull(asserter); + trap = + () -> { + throw new AssertionError("no transition to " + stateName); + }; + } + + /** + * Call this method on the state transition, to register the transition, and validate the passed + * arguments. + * + * @param input Argument(s) of the state transition. + * @throws NullPointerException If no comsumer has been set (an unexpected state transition + * occurred) + */ + public void validateInput(T input) { + Preconditions.checkNotNull(consumer, "No consumer set. Unexpected state transition?"); Review comment: actually, a nicer overall approach would be to have the default consumer do this assertion. We can encapsulate the setup of this consumer in a new `expectNoStateTransition` method which is called in the constructor which should make things easier to understand. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/State.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; + +import java.util.Optional; + +/** + * State abstraction of the {@link DeclarativeScheduler}. This interface contains all methods every + * state implementation must support. + */ +interface State { + + /** This method is called whenever one transitions into this state. */ + default void onEnter() {} + + /** + * This method is called whenever one transitions out of this state. + * + * @param newState newState is the state into which the scheduler transitions + */ + default void onLeave(State newState) {} Review comment: I thought we no longer need this? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/WaitingForResourcesTest.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.declarative; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.TestingExecutionGraphBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for the WaitingForResources state. */ +public class WaitingForResourcesTest extends TestLogger { + private static final ResourceCounter RESOURCE_COUNTER = + ResourceCounter.withResource(ResourceProfile.ANY, 1); Review comment: ```suggestion ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1); ``` ANY should only be used for describing the resources that a slot provides, while UNKNWON must only be used to specify requirements. This distinction is important because you run into tediously subtle issues if you violate this in production code. 😩 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org