dawidwys commented on a change in pull request #13502: URL: https://github.com/apache/flink/pull/13502#discussion_r496478478
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; + +/** + * The runtime execution mode based on which Flink will pick + * how to schedule the tasks of a given job and what will be the default semantics. Review comment: nit: Personally I don't like making the scheduling part so prominent here. It's just one of many characteristics. How about something along the lines of: `The setting determines the runtime behaviour including, but not limited to scheduling, time handling (will there be a late data), incremental updates etc.` or something else that makes the scheduling less prominent. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { + if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) { + return chosenMode; + } + + final boolean continuousSourceExists = transformations + .stream() + .anyMatch(transformation -> + isContinuousSource(transformation) || + transformation + .getTransitivePredecessors() + .stream() + .anyMatch(this::isContinuousSource)); + + return continuousSourceExists + ? RuntimeExecutionMode.STREAMING + : RuntimeExecutionMode.BATCH; + } + + private boolean isContinuousSource(final Transformation<?> transformation) { + checkNotNull(transformation); + return transformation instanceof WithBoundedness && + ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED; + } + + private void configureStreamGraph( + final StreamGraph graph, + final RuntimeExecutionMode runtimeExecutionMode) { + checkNotNull(graph); + checkNotNull(runtimeExecutionMode); + + // by now we must have disambiguated the runtime-mode in which to execute the pipeline. + verifyRuntimeModeIsSet(); Review comment: Do we need to verify the `runtime mode` here? I think it would be better to move the check to `generate` method. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; + +/** + * The runtime execution mode based on which Flink will pick + * how to schedule the tasks of a given job and what will be the default semantics. + * + * @see <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API"> + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a> + */ +@Internal +public enum RuntimeExecutionMode { + STREAMING, Review comment: Before making it `Public` we should add javadocs for the options. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { + if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) { + return chosenMode; + } + + final boolean continuousSourceExists = transformations + .stream() + .anyMatch(transformation -> + isContinuousSource(transformation) || + transformation + .getTransitivePredecessors() + .stream() + .anyMatch(this::isContinuousSource)); + + return continuousSourceExists + ? RuntimeExecutionMode.STREAMING + : RuntimeExecutionMode.BATCH; + } + + private boolean isContinuousSource(final Transformation<?> transformation) { + checkNotNull(transformation); + return transformation instanceof WithBoundedness && + ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED; + } + + private void configureStreamGraph( + final StreamGraph graph, + final RuntimeExecutionMode runtimeExecutionMode) { + checkNotNull(graph); + checkNotNull(runtimeExecutionMode); + + // by now we must have disambiguated the runtime-mode in which to execute the pipeline. + verifyRuntimeModeIsSet(); + + graph.setStateBackend(stateBackend); + graph.setChaining(chaining); + graph.setUserArtifacts(userArtifacts); + graph.setTimeCharacteristic(timeCharacteristic); + graph.setJobName(jobName); + + switch (runtimeExecutionMode) { + case STREAMING: + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setScheduleMode(ScheduleMode.EAGER); + break; + case BATCH: + // TODO: 24.09.20 copied from ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!! + graph.setAllVerticesInSameSlotSharingGroupByDefault(false); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED); + graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST); + setDefaultBufferTimeout(-1); + break; + default: + throw new IllegalArgumentException("Unknown execution mode"); + } + } + /** * Transforms one {@code Transformation}. * * <p>This checks whether we already transformed it and exits early in that case. If not it * delegates to one of the transformation specific methods. */ private Collection<Integer> transform(Transformation<?> transform) { + // by now we must have disambiguated the runtime-mode in which to execute the pipeline. + verifyRuntimeModeIsSet(); Review comment: nit: I am wondering if that's not too defensive. It is a private with a single entry point from `generate` can't we check it only there? This method will be called multiple times recursively and for all transformations. I am not sure if it is not an overkill. What do you think about moving the check to `generate` method? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorRuntimeExecutionModeDetection.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link StreamGraphGenerator#determineExecutionMode(RuntimeExecutionMode)}. + */ +public class StreamGraphGeneratorRuntimeExecutionModeDetection extends TestLogger { + + @Test + public void testDetectionThroughTransitivePredecessors() { + final SourceTransformation<Integer> bounded = + getSourceTransformation("Bounded Source", Boundedness.BOUNDED); + assertEquals(Boundedness.BOUNDED, bounded.getBoundedness()); Review comment: Not sure about this assertion. Are we testing the `getSourceTransformation` here? It distracts from the method we intend to test here, imo. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java ########## @@ -51,7 +51,9 @@ public Pipeline createPipeline(List<Transformation<?>> transformations, TableCon throw new IllegalStateException("No operators defined in streaming topology. Cannot generate StreamGraph."); } return new StreamGraphGenerator( - transformations, executionEnvironment.getConfig(), executionEnvironment.getCheckpointConfig()) Review comment: nit: unrelated change ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { + if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) { + return chosenMode; + } + + final boolean continuousSourceExists = transformations + .stream() + .anyMatch(transformation -> + isContinuousSource(transformation) || + transformation + .getTransitivePredecessors() + .stream() + .anyMatch(this::isContinuousSource)); + + return continuousSourceExists + ? RuntimeExecutionMode.STREAMING + : RuntimeExecutionMode.BATCH; + } + + private boolean isContinuousSource(final Transformation<?> transformation) { + checkNotNull(transformation); + return transformation instanceof WithBoundedness && + ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED; + } + + private void configureStreamGraph( + final StreamGraph graph, + final RuntimeExecutionMode runtimeExecutionMode) { + checkNotNull(graph); + checkNotNull(runtimeExecutionMode); + + // by now we must have disambiguated the runtime-mode in which to execute the pipeline. + verifyRuntimeModeIsSet(); + + graph.setStateBackend(stateBackend); + graph.setChaining(chaining); + graph.setUserArtifacts(userArtifacts); + graph.setTimeCharacteristic(timeCharacteristic); + graph.setJobName(jobName); + + switch (runtimeExecutionMode) { + case STREAMING: + graph.setAllVerticesInSameSlotSharingGroupByDefault(true); + graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + graph.setScheduleMode(ScheduleMode.EAGER); + break; + case BATCH: + // TODO: 24.09.20 copied from ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!! Review comment: I think it is correct ;) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { Review comment: nit: Could we test the PR differently? This method is not a contract of the class, but by writing tests for this method we are making it one. With the tests in place it will be harder to e.g. remove this method, if we decide it makes more sense to do it differently. Two options I can see: 1. If we want to test purely that function, let's extract it to another class with a method like: `determineExecutionMode(List<Transformation<?>, RuntimeExecutionMode)`. The we make explicit contract of that other class and we do not couple the `StreamGraph` with this additional method. 2. Test the effects of determining the mode. E.g. check schedule mode, buffer timeout etc. as this is the added contract of `StreamGraphGenerator` class. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ########## @@ -216,13 +204,74 @@ public StreamGraph generate() { return builtStreamGraph; } + @VisibleForTesting + RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode chosenMode) { Review comment: Could we test the PR differently? This method is not a contract of the class, but by writing tests for this method we are making it one. With the tests in place it will be harder to e.g. remove this method, if we decide it makes more sense to do it differently. Two options I can see: 1. If we want to test purely that function, let's extract it to another class with a method like: `determineExecutionMode(List<Transformation<?>, RuntimeExecutionMode)`. The we make explicit contract of that other class and we do not couple the `StreamGraph` with this additional method. 2. Test the effects of determining the mode. E.g. check schedule mode, buffer timeout etc. as this is the added contract of `StreamGraphGenerator` class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org