aljoscha commented on a change in pull request #13556:
URL: https://github.com/apache/flink/pull/13556#discussion_r501802045



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link 
Transformation}
+ * to its runtime implementation depending on the execution mode it is being 
executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING 
STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.

Review comment:
       I think we don't need to refer to the API facing enum here, it's enough 
to know that we translate either for batch or streaming. Because the enum 
technically also has `AUTOMATIC`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link 
Transformation}

Review comment:
       Should be `{@link StreamGraphTranslator}` or `{@code 
StreamGraphTranslator}`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link 
Transformation}
+ * to its runtime implementation depending on the execution mode it is being 
executed.

Review comment:
       ```suggestion
    * to its runtime implementation depending on the execution mode.
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link 
Transformation}
+ * to its runtime implementation depending on the execution mode it is being 
executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING 
STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being 
translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+       /**
+        * Translates a given {@link Transformation} to its runtime 
implementation for
+        * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH 
BATCH-style} execution.

Review comment:
       Same as above about `RuntimeExecutionMode`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/BaseSimpleTransformationTranslator.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A base class for all {@link StreamGraphTranslator StreamGraphTranslators} 
who translate
+ * {@link Transformation Transformations} that have a single operator in their 
runtime implementation.
+ * These include most of the currently supported operations.
+ *
+ * @param <OUT> The type of the output elements of the transformation being 
translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public abstract class BaseSimpleTransformationTranslator<OUT, T extends 
Transformation<OUT>>

Review comment:
       Maybe `SingleOperatorTranslator` would be more concise. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphTranslator.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+
+import java.util.Collection;
+
+/**
+ * A {@code Translator} is responsible for translating a given {@link 
Transformation}
+ * to its runtime implementation depending on the execution mode it is being 
executed.
+ * Currently, the execution mode can be either
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING 
STREAMING} or
+ * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH BATCH}.
+ *
+ * @param <OUT> The type of the output elements of the transformation being 
translated.
+ * @param <T> The type of transformation being translated.
+ */
+@Internal
+public interface StreamGraphTranslator<OUT, T extends Transformation<OUT>> {
+
+       /**
+        * Translates a given {@link Transformation} to its runtime 
implementation for
+        * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#BATCH 
BATCH-style} execution.
+        *
+        * @param transformation The transformation to be translated.
+        * @param context The translation context.
+        * @return The ids of the nodes corresponding to this transformation in 
the transformation graph.
+        */
+       Collection<Integer> translateForBatch(
+                       final T transformation,
+                       final Context context);
+
+       /**
+        * Translates a given {@link Transformation} to its runtime 
implementation for
+        * {@link org.apache.flink.streaming.api.RuntimeExecutionMode#STREAMING 
STREAMING-style} execution.
+        *
+        * @param transformation The transformation to be translated.
+        * @param context The translation context.
+        * @return The ids of the nodes corresponding to this transformation in 
the transformation graph.
+        */
+       Collection<Integer> translateForStreaming(
+                       final T transformation,
+                       final Context context);
+
+       /**
+        * A context giving the necessary information for the translation of a 
given transformation.
+        */
+       interface Context {
+
+               /**
+                * Returns the {@link StreamGraph} being created as the 
transformations
+                * of a pipeline are translated to their runtime 
implementations.
+                */
+               StreamGraph getStreamGraph();
+
+               /**
+                * Returns the ids of the nodes in the {@link StreamGraph} 
corresponding to the
+                * provided transformation.
+                *
+                * @param transformation the transformation whose nodes' ids we 
want.
+                * @return The requested ids.
+                */
+               Collection<Integer> getTransformationIds(final 
Transformation<?> transformation);

Review comment:
       Should maybe be `getStreamNodeIds()`, because it doesn't return 
transformation its.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -782,16 +797,33 @@ private boolean isUnboundedSource(final Transformation<?> 
transformation) {
                for (int i = 0; i < allInputIds.size(); i++) {
                        Collection<Integer> inputIds = allInputIds.get(i);
                        for (Integer inputId: inputIds) {
-                               streamGraph.addEdge(inputId,
-                                       transform.getId(),
-                                       i + 1
-                               );
+                               streamGraph.addEdge(inputId, transform.getId(), 
i + 1);
                        }
                }
 
                return Collections.singleton(transform.getId());
        }
 
+       /**
+        * Returns a list of lists containing the ids of the nodes in the 
transformation graph
+        * that correspond to the provided transformations. Each transformation 
may have multiple nodes.

Review comment:
       Maybe add here that `Transformations` will be translated if they are not 
already translated.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTranslator.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.graph.BaseSimpleTransformationTranslator;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TransformationTranslator} for the {@link OneInputTransformation}.
+ *
+ * @param <IN> The type of the elements in the input {@code Transformation} of 
the transformation to translate.
+ * @param <OUT> The type of the elements that result from the provided {@code 
OneInputTransformation}.
+ */
+@Internal
+public class OneInputTranslator<IN, OUT> extends 
BaseSimpleTransformationTranslator<OUT, OneInputTransformation<IN, OUT>> {

Review comment:
       Could be `OneInputTransformationTranslator`. It better fits the naming 
scheme but I do realise that it's a bit long... 😅




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to