dawidwys commented on a change in pull request #13502:
URL: https://github.com/apache/flink/pull/13502#discussion_r496478478



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The runtime execution mode based on which Flink will pick
+ * how to schedule the tasks of a given job and what will be the default 
semantics.

Review comment:
       nit: Personally I don't like making the scheduling part so prominent 
here. It's just one of many characteristics.
   
   How about something along the lines of: `The setting determines the runtime 
behaviour including, but not limited to scheduling, time handling (will there 
be a late data), incremental updates etc.` or something else that makes the 
scheduling less prominent. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {
+               if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) 
{
+                       return chosenMode;
+               }
+
+               final boolean continuousSourceExists = transformations
+                               .stream()
+                               .anyMatch(transformation ->
+                                               
isContinuousSource(transformation) ||
+                                               transformation
+                                                               
.getTransitivePredecessors()
+                                                               .stream()
+                                                               
.anyMatch(this::isContinuousSource));
+
+               return continuousSourceExists
+                               ? RuntimeExecutionMode.STREAMING
+                               : RuntimeExecutionMode.BATCH;
+       }
+
+       private boolean isContinuousSource(final Transformation<?> 
transformation) {
+               checkNotNull(transformation);
+               return transformation instanceof WithBoundedness &&
+                               ((WithBoundedness) 
transformation).getBoundedness() != Boundedness.BOUNDED;
+       }
+
+       private void configureStreamGraph(
+                       final StreamGraph graph,
+                       final RuntimeExecutionMode runtimeExecutionMode) {
+               checkNotNull(graph);
+               checkNotNull(runtimeExecutionMode);
+
+               // by now we must have disambiguated the runtime-mode in which 
to execute the pipeline.
+               verifyRuntimeModeIsSet();

Review comment:
       Do we need to verify the `runtime mode` here? I think it would be better 
to move the check to `generate` method.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/RuntimeExecutionMode.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The runtime execution mode based on which Flink will pick
+ * how to schedule the tasks of a given job and what will be the default 
semantics.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API";>
+ *     
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API</a>
+ */
+@Internal
+public enum RuntimeExecutionMode {
+       STREAMING,

Review comment:
       Before making it `Public` we should add javadocs for the options.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {
+               if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) 
{
+                       return chosenMode;
+               }
+
+               final boolean continuousSourceExists = transformations
+                               .stream()
+                               .anyMatch(transformation ->
+                                               
isContinuousSource(transformation) ||
+                                               transformation
+                                                               
.getTransitivePredecessors()
+                                                               .stream()
+                                                               
.anyMatch(this::isContinuousSource));
+
+               return continuousSourceExists
+                               ? RuntimeExecutionMode.STREAMING
+                               : RuntimeExecutionMode.BATCH;
+       }
+
+       private boolean isContinuousSource(final Transformation<?> 
transformation) {
+               checkNotNull(transformation);
+               return transformation instanceof WithBoundedness &&
+                               ((WithBoundedness) 
transformation).getBoundedness() != Boundedness.BOUNDED;
+       }
+
+       private void configureStreamGraph(
+                       final StreamGraph graph,
+                       final RuntimeExecutionMode runtimeExecutionMode) {
+               checkNotNull(graph);
+               checkNotNull(runtimeExecutionMode);
+
+               // by now we must have disambiguated the runtime-mode in which 
to execute the pipeline.
+               verifyRuntimeModeIsSet();
+
+               graph.setStateBackend(stateBackend);
+               graph.setChaining(chaining);
+               graph.setUserArtifacts(userArtifacts);
+               graph.setTimeCharacteristic(timeCharacteristic);
+               graph.setJobName(jobName);
+
+               switch (runtimeExecutionMode) {
+                       case STREAMING:
+                               
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+                               graph.setScheduleMode(ScheduleMode.EAGER);
+                               break;
+                       case BATCH:
+                               // TODO: 24.09.20 copied from 
ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!!
+                               
graph.setAllVerticesInSameSlotSharingGroupByDefault(false);
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+                               
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
+                               setDefaultBufferTimeout(-1);
+                               break;
+                       default:
+                               throw new IllegalArgumentException("Unknown 
execution mode");
+               }
+       }
+
        /**
         * Transforms one {@code Transformation}.
         *
         * <p>This checks whether we already transformed it and exits early in 
that case. If not it
         * delegates to one of the transformation specific methods.
         */
        private Collection<Integer> transform(Transformation<?> transform) {
+               // by now we must have disambiguated the runtime-mode in which 
to execute the pipeline.
+               verifyRuntimeModeIsSet();

Review comment:
       nit: I am wondering if that's not too defensive. It is a private with a 
single entry point from `generate` can't we check it only there? This method 
will be called multiple times recursively and for all transformations. I am not 
sure if it is not an overkill.
   
   What do you think about moving the check to `generate` method?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorRuntimeExecutionModeDetection.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link 
StreamGraphGenerator#determineExecutionMode(RuntimeExecutionMode)}.
+ */
+public class StreamGraphGeneratorRuntimeExecutionModeDetection extends 
TestLogger {
+
+       @Test
+       public void testDetectionThroughTransitivePredecessors() {
+               final SourceTransformation<Integer> bounded =
+                               getSourceTransformation("Bounded Source", 
Boundedness.BOUNDED);
+               assertEquals(Boundedness.BOUNDED, bounded.getBoundedness());

Review comment:
       Not sure about this assertion. Are we testing the 
`getSourceTransformation` here? It distracts from the method we intend to test 
here, imo.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
##########
@@ -51,7 +51,9 @@ public Pipeline createPipeline(List<Transformation<?>> 
transformations, TableCon
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot generate StreamGraph.");
                }
                return new StreamGraphGenerator(
-                       transformations, executionEnvironment.getConfig(), 
executionEnvironment.getCheckpointConfig())

Review comment:
       nit: unrelated change

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {
+               if (checkNotNull(chosenMode) != RuntimeExecutionMode.AUTOMATIC) 
{
+                       return chosenMode;
+               }
+
+               final boolean continuousSourceExists = transformations
+                               .stream()
+                               .anyMatch(transformation ->
+                                               
isContinuousSource(transformation) ||
+                                               transformation
+                                                               
.getTransitivePredecessors()
+                                                               .stream()
+                                                               
.anyMatch(this::isContinuousSource));
+
+               return continuousSourceExists
+                               ? RuntimeExecutionMode.STREAMING
+                               : RuntimeExecutionMode.BATCH;
+       }
+
+       private boolean isContinuousSource(final Transformation<?> 
transformation) {
+               checkNotNull(transformation);
+               return transformation instanceof WithBoundedness &&
+                               ((WithBoundedness) 
transformation).getBoundedness() != Boundedness.BOUNDED;
+       }
+
+       private void configureStreamGraph(
+                       final StreamGraph graph,
+                       final RuntimeExecutionMode runtimeExecutionMode) {
+               checkNotNull(graph);
+               checkNotNull(runtimeExecutionMode);
+
+               // by now we must have disambiguated the runtime-mode in which 
to execute the pipeline.
+               verifyRuntimeModeIsSet();
+
+               graph.setStateBackend(stateBackend);
+               graph.setChaining(chaining);
+               graph.setUserArtifacts(userArtifacts);
+               graph.setTimeCharacteristic(timeCharacteristic);
+               graph.setJobName(jobName);
+
+               switch (runtimeExecutionMode) {
+                       case STREAMING:
+                               
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
+                               
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+                               graph.setScheduleMode(ScheduleMode.EAGER);
+                               break;
+                       case BATCH:
+                               // TODO: 24.09.20 copied from 
ExecutorUtils.setBatchProperties(StreamGraph...). Ask if correct!!!

Review comment:
       I think it is correct ;)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {

Review comment:
       nit: Could we test the PR differently? This method is not a contract of 
the class, but by writing tests for this method we are making it one. With the 
tests in place it will be harder to e.g. remove this method, if we decide it 
makes more sense to do it differently.
   
   Two options I can see:
   1. If we want to test purely that function, let's extract it to another 
class with a method like: `determineExecutionMode(List<Transformation<?>, 
RuntimeExecutionMode)`. The we make explicit contract of that other class and 
we do not couple the `StreamGraph` with this additional method.
   2. Test the effects of determining the mode. E.g. check schedule mode, 
buffer timeout etc. as this is the added contract of `StreamGraphGenerator` 
class.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -216,13 +204,74 @@ public StreamGraph generate() {
                return builtStreamGraph;
        }
 
+       @VisibleForTesting
+       RuntimeExecutionMode determineExecutionMode(final RuntimeExecutionMode 
chosenMode) {

Review comment:
       Could we test the PR differently? This method is not a contract of the 
class, but by writing tests for this method we are making it one. With the 
tests in place it will be harder to e.g. remove this method, if we decide it 
makes more sense to do it differently.
   
   Two options I can see:
   1. If we want to test purely that function, let's extract it to another 
class with a method like: `determineExecutionMode(List<Transformation<?>, 
RuntimeExecutionMode)`. The we make explicit contract of that other class and 
we do not couple the `StreamGraph` with this additional method.
   2. Test the effects of determining the mode. E.g. check schedule mode, 
buffer timeout etc. as this is the added contract of `StreamGraphGenerator` 
class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to