rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r477487377



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {

Review comment:
       This class duplicates quite some parts of `ChainingOutput`.
   Why not re-use it by extending/delegating?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -628,4 +622,46 @@ public String toString() {
 
                return builder.toString();
        }
+
+       /**
+        * Interface representing chained inputs.
+        */
+       public static class Input implements Serializable {

Review comment:
       1. Rename `Input` to `InputConfig` as there is `..api.operators.Input` 
already? (and subclasses)
   2. Make it `interface` and not `class`? This will allow children to extend 
other classes and ease testing
   3. Why is this class not parameterized? Its serializer is passed to 
parameterized classes
   
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
        OperatorChain(
                        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
                        RecordWriterOutput<?>[] streamOutputs,
-                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainEntryPoint,
-                       StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
mainOperatorOutput,
+                       StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
                this.streamOutputs = checkNotNull(streamOutputs);
-               this.chainEntryPoint = checkNotNull(chainEntryPoint);
+               this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
                this.operatorEventDispatcher = null;
 
                checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-               this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+               this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
                this.tailOperatorWrapper = allOperatorWrappers.get(0);
                this.numOperators = allOperatorWrappers.size();
+               this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-               linkOperatorWrappers(allOperatorWrappers);
+               firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+       }
+
+       private void createChainOutputs(
+               List<StreamEdge> outEdgesInOrder,
+               RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate,
+               Map<Integer, StreamConfig> chainedConfigs,
+               StreamTask<OUT, OP> containingTask,
+               Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {

Review comment:
       nit: indentation

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+       protected final Input<T> input;
+       protected final Counter numRecordsIn;
+       protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+       protected final StreamStatusProvider streamStatusProvider;
+       @Nullable protected final OutputTag<T> outputTag;
+
+       public MultipleInputChainingOutput(
+                       Input<T> input,
+                       OperatorMetricGroup operatorMetricGroup,
+                       StreamStatusProvider streamStatusProvider,
+                       @Nullable OutputTag<T> outputTag) {
+               this.input = input;
+
+               {
+                       Counter tmpNumRecordsIn;
+                       try {
+                               OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
+                               tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+                       } catch (Exception e) {
+                               LOG.warn("An exception occurred during the 
metrics setup.", e);
+                               tmpNumRecordsIn = new SimpleCounter();
+                       }
+                       numRecordsIn = tmpNumRecordsIn;
+               }
+
+               this.streamStatusProvider = streamStatusProvider;
+               this.outputTag = outputTag;
+       }
+
+       @Override
+       public void collect(StreamRecord<T> record) {
+               if (this.outputTag != null) {
+                       // we are not responsible for emitting to the main 
output.
+                       return;
+               }
+
+               pushToOperator(record);
+       }
+
+       @Override
+       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) 
{
+               if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {

Review comment:
       `Objects.equals`? 
   I guess we should also process record if both tags are null.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
        OperatorChain(
                        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
                        RecordWriterOutput<?>[] streamOutputs,
-                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainEntryPoint,
-                       StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
mainOperatorOutput,
+                       StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
                this.streamOutputs = checkNotNull(streamOutputs);
-               this.chainEntryPoint = checkNotNull(chainEntryPoint);
+               this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
                this.operatorEventDispatcher = null;
 
                checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-               this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+               this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
                this.tailOperatorWrapper = allOperatorWrappers.get(0);
                this.numOperators = allOperatorWrappers.size();
+               this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-               linkOperatorWrappers(allOperatorWrappers);
+               firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+       }
+
+       private void createChainOutputs(
+               List<StreamEdge> outEdgesInOrder,
+               RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate,
+               Map<Integer, StreamConfig> chainedConfigs,
+               StreamTask<OUT, OP> containingTask,
+               Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+               for (int i = 0; i < outEdgesInOrder.size(); i++) {
+                       StreamEdge outEdge = outEdgesInOrder.get(i);
+
+                       RecordWriterOutput<?> streamOutput = createStreamOutput(
+                               recordWriterDelegate.getRecordWriter(i),
+                               outEdge,
+                               chainedConfigs.get(outEdge.getSourceId()),
+                               containingTask.getEnvironment());
+
+                       this.streamOutputs[i] = streamOutput;
+                       streamOutputMap.put(outEdge, streamOutput);
+               }
+       }
+
+       private ChainedSourceOutputs createChainedInputs(
+                       StreamTask<OUT, OP> containingTask,
+                       StreamConfig.Input[] configuredInputs,
+                       Map<Integer, StreamConfig> chainedConfigs,
+                       ClassLoader userCodeClassloader,
+                       List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
+               if (Arrays.stream(configuredInputs).noneMatch(input -> input 
instanceof SourceInput)) {
+                       return new ChainedSourceOutputs();
+               }
+               checkState(
+                       mainOperatorWrapper.getStreamOperator() instanceof 
MultipleInputStreamOperator,

Review comment:
       Isn't this method called for any operator?
   The check will not return if there are `NetworkInput`s configured.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() {
                }
        }
 
-       public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
-               config.setInteger(TYPE_SERIALIZERS_IN_COUNT, 
serializers.length);
-               for (int i = 0; i < serializers.length; i++) {
-                       
setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), 
serializers[i]);
-               }
-       }
-
        public void setTypeSerializerOut(TypeSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
        }
 
+       public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+               try {
+                       return 
InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+               } catch (Exception e) {
+                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+               }
+       }
+
        public void setTypeSerializerSideOut(OutputTag<?> outputTag, 
TypeSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + 
outputTag.getId(), serializer);
        }
 
-       @Deprecated
-       public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-               return getTypeSerializerIn(0, cl);
+       private void setTypeSerializer(String key, TypeSerializer<?> 
typeWrapper) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(typeWrapper, 
this.config, key);
+               } catch (IOException e) {
+                       throw new StreamTaskException("Could not serialize type 
serializer.", e);
+               }
        }
 
-       @Deprecated
-       public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-               return getTypeSerializerIn(1, cl);
+       public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> 
outputTag, ClassLoader cl) {
+               Preconditions.checkNotNull(outputTag, "Side output id must not 
be null.");
+               try {
+                       return 
InstantiationUtil.readObjectFromConfig(this.config, 
TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+               } catch (Exception e) {
+                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+               }
        }
 
-       public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) {
-               int typeSerializersCount = 
config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1);
-               checkState(
-                       typeSerializersCount >= 0,
-                       "Missing value for %s in the config? [%d]",
-                       TYPE_SERIALIZERS_IN_COUNT,
-                       typeSerializersCount);
-               TypeSerializer<?>[] typeSerializers = new 
TypeSerializer<?>[typeSerializersCount];
-               for (int i = 0; i < typeSerializers.length; i++) {
-                       typeSerializers[i] = getTypeSerializerIn(i, cl);
+       public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
+               Input[] inputs = new Input[serializers.length];
+               for (int i = 0; i < serializers.length; i++) {
+                       inputs[i] = new NetworkInput(serializers[i], i);
                }
-               return typeSerializers;
+               setInputs(inputs);
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader 
cl) {
+       public void setInputs(Input ...inputs) {
                try {
-                       return InstantiationUtil.readObjectFromConfig(
-                               this.config,
-                               String.format(TYPE_SERIALIZERS_IN_PATTERN, 
index),
-                               cl);
-               } catch (Exception e) {
-                       throw new StreamTaskException(
-                               String.format("Could not instantiate serializer 
for [%d] input.", index),
-                               e);
+                       InstantiationUtil.writeObjectToConfig(inputs, 
this.config, INPUTS);
+               } catch (IOException e) {
+                       throw new StreamTaskException("Could not serialize 
inputs.", e);
                }
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+       public Input[] getInputs(ClassLoader cl) {
                try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+                       Input[] inputs = 
InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl);
+                       if (inputs == null) {
+                               return new Input[0];
+                       }
+                       return inputs;
                } catch (Exception e) {
-                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+                       throw new StreamTaskException("Could not deserialize 
inputs", e);
                }
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> 
outputTag, ClassLoader cl) {
-               Preconditions.checkNotNull(outputTag, "Side output id must not 
be null.");
-               try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, 
TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
-               } catch (Exception e) {
-                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
-               }
+       @Deprecated
+       public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+               return getTypeSerializerIn(0, cl);
        }
 
-       private void setTypeSerializer(String key, TypeSerializer<?> 
typeWrapper) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(typeWrapper, 
this.config, key);
-               } catch (IOException e) {
-                       throw new StreamTaskException("Could not serialize type 
serializer.", e);
+       @Deprecated
+       public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+               return getTypeSerializerIn(1, cl);
+       }
+
+       public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader 
cl) {
+               Input[] inputs = getInputs(cl);
+               if (index >= inputs.length) {
+                       return null;

Review comment:
       Why can this happen?
   Should we return `Optional` or mark with `@Nullable`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() {
                }
        }
 
-       public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
-               config.setInteger(TYPE_SERIALIZERS_IN_COUNT, 
serializers.length);
-               for (int i = 0; i < serializers.length; i++) {
-                       
setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), 
serializers[i]);
-               }
-       }
-
        public void setTypeSerializerOut(TypeSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
        }
 
+       public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+               try {
+                       return 
InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+               } catch (Exception e) {
+                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+               }
+       }
+
        public void setTypeSerializerSideOut(OutputTag<?> outputTag, 
TypeSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + 
outputTag.getId(), serializer);
        }
 
-       @Deprecated
-       public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-               return getTypeSerializerIn(0, cl);
+       private void setTypeSerializer(String key, TypeSerializer<?> 
typeWrapper) {
+               try {
+                       InstantiationUtil.writeObjectToConfig(typeWrapper, 
this.config, key);
+               } catch (IOException e) {
+                       throw new StreamTaskException("Could not serialize type 
serializer.", e);
+               }
        }
 
-       @Deprecated
-       public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-               return getTypeSerializerIn(1, cl);
+       public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> 
outputTag, ClassLoader cl) {
+               Preconditions.checkNotNull(outputTag, "Side output id must not 
be null.");
+               try {
+                       return 
InstantiationUtil.readObjectFromConfig(this.config, 
TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
+               } catch (Exception e) {
+                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+               }
        }
 
-       public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) {
-               int typeSerializersCount = 
config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1);
-               checkState(
-                       typeSerializersCount >= 0,
-                       "Missing value for %s in the config? [%d]",
-                       TYPE_SERIALIZERS_IN_COUNT,
-                       typeSerializersCount);
-               TypeSerializer<?>[] typeSerializers = new 
TypeSerializer<?>[typeSerializersCount];
-               for (int i = 0; i < typeSerializers.length; i++) {
-                       typeSerializers[i] = getTypeSerializerIn(i, cl);
+       public void setTypeSerializersIn(TypeSerializer<?> ...serializers) {
+               Input[] inputs = new Input[serializers.length];
+               for (int i = 0; i < serializers.length; i++) {
+                       inputs[i] = new NetworkInput(serializers[i], i);
                }
-               return typeSerializers;
+               setInputs(inputs);
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader 
cl) {
+       public void setInputs(Input ...inputs) {
                try {
-                       return InstantiationUtil.readObjectFromConfig(
-                               this.config,
-                               String.format(TYPE_SERIALIZERS_IN_PATTERN, 
index),
-                               cl);
-               } catch (Exception e) {
-                       throw new StreamTaskException(
-                               String.format("Could not instantiate serializer 
for [%d] input.", index),
-                               e);
+                       InstantiationUtil.writeObjectToConfig(inputs, 
this.config, INPUTS);
+               } catch (IOException e) {
+                       throw new StreamTaskException("Could not serialize 
inputs.", e);
                }
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
+       public Input[] getInputs(ClassLoader cl) {
                try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
+                       Input[] inputs = 
InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl);
+                       if (inputs == null) {
+                               return new Input[0];
+                       }
+                       return inputs;
                } catch (Exception e) {
-                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
+                       throw new StreamTaskException("Could not deserialize 
inputs", e);
                }
        }
 
-       public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> 
outputTag, ClassLoader cl) {
-               Preconditions.checkNotNull(outputTag, "Side output id must not 
be null.");
-               try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, 
TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl);
-               } catch (Exception e) {
-                       throw new StreamTaskException("Could not instantiate 
serializer.", e);
-               }
+       @Deprecated
+       public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+               return getTypeSerializerIn(0, cl);
        }
 
-       private void setTypeSerializer(String key, TypeSerializer<?> 
typeWrapper) {
-               try {
-                       InstantiationUtil.writeObjectToConfig(typeWrapper, 
this.config, key);
-               } catch (IOException e) {
-                       throw new StreamTaskException("Could not serialize type 
serializer.", e);
+       @Deprecated
+       public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+               return getTypeSerializerIn(1, cl);
+       }
+
+       public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader 
cl) {
+               Input[] inputs = getInputs(cl);
+               if (index >= inputs.length) {
+                       return null;
                }
+               checkState(inputs[index] instanceof NetworkInput, "Input [%d] 
was assumed to be network input", index);

Review comment:
       I think `checkState` only recognizes `%s` pattern.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -53,42 +53,48 @@ public void init() throws Exception {
                StreamConfig configuration = getConfiguration();
                ClassLoader userClassLoader = getUserCodeClassLoader();
 
-               TypeSerializer<?>[] inputDeserializers = 
configuration.getTypeSerializersIn(userClassLoader);
+               StreamConfig.Input[] inputs = 
configuration.getInputs(userClassLoader);
 
-               ArrayList<IndexedInputGate>[] inputLists = new 
ArrayList[inputDeserializers.length];
-               WatermarkGauge[] watermarkGauges = new 
WatermarkGauge[inputDeserializers.length];
+               ArrayList<IndexedInputGate>[] inputLists = new ArrayList[
+                       (int) Arrays.stream(inputs)
+                               .filter(input -> (input instanceof 
StreamConfig.NetworkInput))
+                               .count()];

Review comment:
       Isn't it the same as `configuration.getNumberOfNetworkInputs` below?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -547,302 +675,38 @@ private void 
linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW
                return (tailOperatorWrapper == null) ? null : 
tailOperatorWrapper.getStreamOperator();
        }
 
-       // 
------------------------------------------------------------------------
-       //  Collectors for output chaining
-       // 
------------------------------------------------------------------------
-
        /**
-        * An {@link Output} that measures the last emitted watermark with a 
{@link WatermarkGauge}.
-        *
-        * @param <T> The type of the elements that can be emitted.
+        * Wrapper class to access the chained sources and their's outputs.
         */
-       public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
-               Gauge<Long> getWatermarkGauge();
-       }
-
-       static class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
-
-               protected final OneInputStreamOperator<T, ?> operator;
-               protected final Counter numRecordsIn;
-               protected final WatermarkGauge watermarkGauge = new 
WatermarkGauge();
-
-               protected final StreamStatusProvider streamStatusProvider;
-
-               @Nullable
-               protected final OutputTag<T> outputTag;
-
-               public ChainingOutput(
-                               OneInputStreamOperator<T, ?> operator,
-                               StreamStatusProvider streamStatusProvider,
-                               @Nullable OutputTag<T> outputTag) {
-                       this.operator = operator;
-
-                       {
-                               Counter tmpNumRecordsIn;
-                               try {
-                                       OperatorIOMetricGroup ioMetricGroup = 
((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-                                       tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
-                               } catch (Exception e) {
-                                       LOG.warn("An exception occurred during 
the metrics setup.", e);
-                                       tmpNumRecordsIn = new SimpleCounter();
-                               }
-                               numRecordsIn = tmpNumRecordsIn;
-                       }
+       public static class ChainedSourceOutputs {
+               private final Map<Integer, 
WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs;
+               private final Map<Integer, SourceOperator<?, ?>> 
sourceOperators;

Review comment:
       I think having a single map on `OperatorChain` level with values holding 
a pair of Output + Operator would be easier to read and more efficient (by pair 
I mean a class, named maybe `ChainedSourceOutputs`).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -208,19 +206,100 @@ public OperatorChain(
        OperatorChain(
                        List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
                        RecordWriterOutput<?>[] streamOutputs,
-                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainEntryPoint,
-                       StreamOperatorWrapper<OUT, OP> headOperatorWrapper) {
+                       WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
mainOperatorOutput,
+                       StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
 
                this.streamOutputs = checkNotNull(streamOutputs);
-               this.chainEntryPoint = checkNotNull(chainEntryPoint);
+               this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
                this.operatorEventDispatcher = null;
 
                checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-               this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+               this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
                this.tailOperatorWrapper = allOperatorWrappers.get(0);
                this.numOperators = allOperatorWrappers.size();
+               this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-               linkOperatorWrappers(allOperatorWrappers);
+               firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+       }
+
+       private void createChainOutputs(
+               List<StreamEdge> outEdgesInOrder,
+               RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate,
+               Map<Integer, StreamConfig> chainedConfigs,
+               StreamTask<OUT, OP> containingTask,
+               Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
+               for (int i = 0; i < outEdgesInOrder.size(); i++) {
+                       StreamEdge outEdge = outEdgesInOrder.get(i);
+
+                       RecordWriterOutput<?> streamOutput = createStreamOutput(
+                               recordWriterDelegate.getRecordWriter(i),
+                               outEdge,
+                               chainedConfigs.get(outEdge.getSourceId()),
+                               containingTask.getEnvironment());
+
+                       this.streamOutputs[i] = streamOutput;
+                       streamOutputMap.put(outEdge, streamOutput);
+               }
+       }
+
+       private ChainedSourceOutputs createChainedInputs(

Review comment:
       Something is wrong with return type or name here :) 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -547,302 +675,38 @@ private void 
linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW
                return (tailOperatorWrapper == null) ? null : 
tailOperatorWrapper.getStreamOperator();
        }
 
-       // 
------------------------------------------------------------------------
-       //  Collectors for output chaining
-       // 
------------------------------------------------------------------------
-
        /**
-        * An {@link Output} that measures the last emitted watermark with a 
{@link WatermarkGauge}.
-        *
-        * @param <T> The type of the elements that can be emitted.
+        * Wrapper class to access the chained sources and their's outputs.
         */
-       public interface WatermarkGaugeExposingOutput<T> extends Output<T> {
-               Gauge<Long> getWatermarkGauge();
-       }
-
-       static class ChainingOutput<T> implements 
WatermarkGaugeExposingOutput<StreamRecord<T>> {
-
-               protected final OneInputStreamOperator<T, ?> operator;
-               protected final Counter numRecordsIn;
-               protected final WatermarkGauge watermarkGauge = new 
WatermarkGauge();
-
-               protected final StreamStatusProvider streamStatusProvider;
-
-               @Nullable
-               protected final OutputTag<T> outputTag;
-
-               public ChainingOutput(
-                               OneInputStreamOperator<T, ?> operator,
-                               StreamStatusProvider streamStatusProvider,
-                               @Nullable OutputTag<T> outputTag) {
-                       this.operator = operator;
-
-                       {
-                               Counter tmpNumRecordsIn;
-                               try {
-                                       OperatorIOMetricGroup ioMetricGroup = 
((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
-                                       tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
-                               } catch (Exception e) {
-                                       LOG.warn("An exception occurred during 
the metrics setup.", e);
-                                       tmpNumRecordsIn = new SimpleCounter();
-                               }
-                               numRecordsIn = tmpNumRecordsIn;
-                       }
+       public static class ChainedSourceOutputs {
+               private final Map<Integer, 
WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs;
+               private final Map<Integer, SourceOperator<?, ?>> 
sourceOperators;

Review comment:
       Map key is the index of input in `StreamConfig`, right? This seems a bit 
fragile and not obvious.
   How about keying by `Input` instances? I see we have them both on put and 
get.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -90,16 +91,19 @@
 
        private final RecordWriterOutput<?>[] streamOutputs;
 
-       private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
chainEntryPoint;
+       private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> 
mainOperatorOutput;
 
        /**
         * For iteration, {@link StreamIterationHead} and {@link 
StreamIterationTail} used for executing
-        * feedback edges do not contain any operators, in which case, {@code 
headOperatorWrapper} and
+        * feedback edges do not contain any operators, in which case, {@code 
mainOperatorWrapper} and
         * {@code tailOperatorWrapper} are null.
         */
-       @Nullable private final StreamOperatorWrapper<OUT, OP> 
headOperatorWrapper;
+       @Nullable private final StreamOperatorWrapper<OUT, OP> 
mainOperatorWrapper;
+       @Nullable private final StreamOperatorWrapper<?, ?> 
firstOperatorWrapper;

Review comment:
       Why do we need `first`? Isn't it the same as `main`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to