rkhachatryan commented on a change in pull request #13234: URL: https://github.com/apache/flink/pull/13234#discussion_r477487377
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; +import org.apache.flink.util.OutputTag; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { Review comment: This class duplicates quite some parts of `ChainingOutput`. Why not re-use it by extending/delegating? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ########## @@ -628,4 +622,46 @@ public String toString() { return builder.toString(); } + + /** + * Interface representing chained inputs. + */ + public static class Input implements Serializable { Review comment: 1. Rename `Input` to `InputConfig` as there is `..api.operators.Input` already? (and subclasses) 2. Make it `interface` and not `class`? This will allow children to extend other classes and ease testing 3. Why is this class not parameterized? Its serializer is passed to parameterized classes ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -208,19 +206,100 @@ public OperatorChain( OperatorChain( List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, RecordWriterOutput<?>[] streamOutputs, - WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint, - StreamOperatorWrapper<OUT, OP> headOperatorWrapper) { + WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput, + StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) { this.streamOutputs = checkNotNull(streamOutputs); - this.chainEntryPoint = checkNotNull(chainEntryPoint); + this.mainOperatorOutput = checkNotNull(mainOperatorOutput); this.operatorEventDispatcher = null; checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0); - this.headOperatorWrapper = checkNotNull(headOperatorWrapper); + this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper); this.tailOperatorWrapper = allOperatorWrappers.get(0); this.numOperators = allOperatorWrappers.size(); + this.chainedSourceOutputs = new ChainedSourceOutputs(); - linkOperatorWrappers(allOperatorWrappers); + firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers); + } + + private void createChainOutputs( + List<StreamEdge> outEdgesInOrder, + RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, + Map<Integer, StreamConfig> chainedConfigs, + StreamTask<OUT, OP> containingTask, + Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) { Review comment: nit: indentation ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; +import org.apache.flink.util.OutputTag; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +class MultipleInputChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { + private static final Logger LOG = LoggerFactory.getLogger(MultipleInputChainingOutput.class); + + protected final Input<T> input; + protected final Counter numRecordsIn; + protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); + protected final StreamStatusProvider streamStatusProvider; + @Nullable protected final OutputTag<T> outputTag; + + public MultipleInputChainingOutput( + Input<T> input, + OperatorMetricGroup operatorMetricGroup, + StreamStatusProvider streamStatusProvider, + @Nullable OutputTag<T> outputTag) { + this.input = input; + + { + Counter tmpNumRecordsIn; + try { + OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup.getIOMetricGroup(); + tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); + } catch (Exception e) { + LOG.warn("An exception occurred during the metrics setup.", e); + tmpNumRecordsIn = new SimpleCounter(); + } + numRecordsIn = tmpNumRecordsIn; + } + + this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; + } + + @Override + public void collect(StreamRecord<T> record) { + if (this.outputTag != null) { + // we are not responsible for emitting to the main output. + return; + } + + pushToOperator(record); + } + + @Override + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { Review comment: `Objects.equals`? I guess we should also process record if both tags are null. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -208,19 +206,100 @@ public OperatorChain( OperatorChain( List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, RecordWriterOutput<?>[] streamOutputs, - WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint, - StreamOperatorWrapper<OUT, OP> headOperatorWrapper) { + WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput, + StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) { this.streamOutputs = checkNotNull(streamOutputs); - this.chainEntryPoint = checkNotNull(chainEntryPoint); + this.mainOperatorOutput = checkNotNull(mainOperatorOutput); this.operatorEventDispatcher = null; checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0); - this.headOperatorWrapper = checkNotNull(headOperatorWrapper); + this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper); this.tailOperatorWrapper = allOperatorWrappers.get(0); this.numOperators = allOperatorWrappers.size(); + this.chainedSourceOutputs = new ChainedSourceOutputs(); - linkOperatorWrappers(allOperatorWrappers); + firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers); + } + + private void createChainOutputs( + List<StreamEdge> outEdgesInOrder, + RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, + Map<Integer, StreamConfig> chainedConfigs, + StreamTask<OUT, OP> containingTask, + Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) { + for (int i = 0; i < outEdgesInOrder.size(); i++) { + StreamEdge outEdge = outEdgesInOrder.get(i); + + RecordWriterOutput<?> streamOutput = createStreamOutput( + recordWriterDelegate.getRecordWriter(i), + outEdge, + chainedConfigs.get(outEdge.getSourceId()), + containingTask.getEnvironment()); + + this.streamOutputs[i] = streamOutput; + streamOutputMap.put(outEdge, streamOutput); + } + } + + private ChainedSourceOutputs createChainedInputs( + StreamTask<OUT, OP> containingTask, + StreamConfig.Input[] configuredInputs, + Map<Integer, StreamConfig> chainedConfigs, + ClassLoader userCodeClassloader, + List<StreamOperatorWrapper<?, ?>> allOpWrappers) { + if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof SourceInput)) { + return new ChainedSourceOutputs(); + } + checkState( + mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator, Review comment: Isn't this method called for any operator? The check will not return if there are `NetworkInput`s configured. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ########## @@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() { } } - public void setTypeSerializersIn(TypeSerializer<?> ...serializers) { - config.setInteger(TYPE_SERIALIZERS_IN_COUNT, serializers.length); - for (int i = 0; i < serializers.length; i++) { - setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), serializers[i]); - } - } - public void setTypeSerializerOut(TypeSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer); } + public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) { + try { + return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate serializer.", e); + } + } + public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer); } - @Deprecated - public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) { - return getTypeSerializerIn(0, cl); + private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) { + try { + InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize type serializer.", e); + } } - @Deprecated - public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) { - return getTypeSerializerIn(1, cl); + public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) { + Preconditions.checkNotNull(outputTag, "Side output id must not be null."); + try { + return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate serializer.", e); + } } - public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) { - int typeSerializersCount = config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1); - checkState( - typeSerializersCount >= 0, - "Missing value for %s in the config? [%d]", - TYPE_SERIALIZERS_IN_COUNT, - typeSerializersCount); - TypeSerializer<?>[] typeSerializers = new TypeSerializer<?>[typeSerializersCount]; - for (int i = 0; i < typeSerializers.length; i++) { - typeSerializers[i] = getTypeSerializerIn(i, cl); + public void setTypeSerializersIn(TypeSerializer<?> ...serializers) { + Input[] inputs = new Input[serializers.length]; + for (int i = 0; i < serializers.length; i++) { + inputs[i] = new NetworkInput(serializers[i], i); } - return typeSerializers; + setInputs(inputs); } - public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) { + public void setInputs(Input ...inputs) { try { - return InstantiationUtil.readObjectFromConfig( - this.config, - String.format(TYPE_SERIALIZERS_IN_PATTERN, index), - cl); - } catch (Exception e) { - throw new StreamTaskException( - String.format("Could not instantiate serializer for [%d] input.", index), - e); + InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize inputs.", e); } } - public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) { + public Input[] getInputs(ClassLoader cl) { try { - return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl); + Input[] inputs = InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl); + if (inputs == null) { + return new Input[0]; + } + return inputs; } catch (Exception e) { - throw new StreamTaskException("Could not instantiate serializer.", e); + throw new StreamTaskException("Could not deserialize inputs", e); } } - public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) { - Preconditions.checkNotNull(outputTag, "Side output id must not be null."); - try { - return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl); - } catch (Exception e) { - throw new StreamTaskException("Could not instantiate serializer.", e); - } + @Deprecated + public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) { + return getTypeSerializerIn(0, cl); } - private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) { - try { - InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize type serializer.", e); + @Deprecated + public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) { + return getTypeSerializerIn(1, cl); + } + + public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) { + Input[] inputs = getInputs(cl); + if (index >= inputs.length) { + return null; Review comment: Why can this happen? Should we return `Optional` or mark with `@Nullable`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ########## @@ -160,81 +159,84 @@ public TimeCharacteristic getTimeCharacteristic() { } } - public void setTypeSerializersIn(TypeSerializer<?> ...serializers) { - config.setInteger(TYPE_SERIALIZERS_IN_COUNT, serializers.length); - for (int i = 0; i < serializers.length; i++) { - setTypeSerializer(String.format(TYPE_SERIALIZERS_IN_PATTERN, i), serializers[i]); - } - } - public void setTypeSerializerOut(TypeSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer); } + public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) { + try { + return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate serializer.", e); + } + } + public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), serializer); } - @Deprecated - public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) { - return getTypeSerializerIn(0, cl); + private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) { + try { + InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize type serializer.", e); + } } - @Deprecated - public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) { - return getTypeSerializerIn(1, cl); + public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) { + Preconditions.checkNotNull(outputTag, "Side output id must not be null."); + try { + return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate serializer.", e); + } } - public TypeSerializer<?>[] getTypeSerializersIn(ClassLoader cl) { - int typeSerializersCount = config.getInteger(TYPE_SERIALIZERS_IN_COUNT, -1); - checkState( - typeSerializersCount >= 0, - "Missing value for %s in the config? [%d]", - TYPE_SERIALIZERS_IN_COUNT, - typeSerializersCount); - TypeSerializer<?>[] typeSerializers = new TypeSerializer<?>[typeSerializersCount]; - for (int i = 0; i < typeSerializers.length; i++) { - typeSerializers[i] = getTypeSerializerIn(i, cl); + public void setTypeSerializersIn(TypeSerializer<?> ...serializers) { + Input[] inputs = new Input[serializers.length]; + for (int i = 0; i < serializers.length; i++) { + inputs[i] = new NetworkInput(serializers[i], i); } - return typeSerializers; + setInputs(inputs); } - public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) { + public void setInputs(Input ...inputs) { try { - return InstantiationUtil.readObjectFromConfig( - this.config, - String.format(TYPE_SERIALIZERS_IN_PATTERN, index), - cl); - } catch (Exception e) { - throw new StreamTaskException( - String.format("Could not instantiate serializer for [%d] input.", index), - e); + InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS); + } catch (IOException e) { + throw new StreamTaskException("Could not serialize inputs.", e); } } - public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) { + public Input[] getInputs(ClassLoader cl) { try { - return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl); + Input[] inputs = InstantiationUtil.readObjectFromConfig(this.config, INPUTS, cl); + if (inputs == null) { + return new Input[0]; + } + return inputs; } catch (Exception e) { - throw new StreamTaskException("Could not instantiate serializer.", e); + throw new StreamTaskException("Could not deserialize inputs", e); } } - public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) { - Preconditions.checkNotNull(outputTag, "Side output id must not be null."); - try { - return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_SIDEOUT_PREFIX + outputTag.getId(), cl); - } catch (Exception e) { - throw new StreamTaskException("Could not instantiate serializer.", e); - } + @Deprecated + public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) { + return getTypeSerializerIn(0, cl); } - private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) { - try { - InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize type serializer.", e); + @Deprecated + public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) { + return getTypeSerializerIn(1, cl); + } + + public <T> TypeSerializer<T> getTypeSerializerIn(int index, ClassLoader cl) { + Input[] inputs = getInputs(cl); + if (index >= inputs.length) { + return null; } + checkState(inputs[index] instanceof NetworkInput, "Input [%d] was assumed to be network input", index); Review comment: I think `checkState` only recognizes `%s` pattern. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -53,42 +53,48 @@ public void init() throws Exception { StreamConfig configuration = getConfiguration(); ClassLoader userClassLoader = getUserCodeClassLoader(); - TypeSerializer<?>[] inputDeserializers = configuration.getTypeSerializersIn(userClassLoader); + StreamConfig.Input[] inputs = configuration.getInputs(userClassLoader); - ArrayList<IndexedInputGate>[] inputLists = new ArrayList[inputDeserializers.length]; - WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputDeserializers.length]; + ArrayList<IndexedInputGate>[] inputLists = new ArrayList[ + (int) Arrays.stream(inputs) + .filter(input -> (input instanceof StreamConfig.NetworkInput)) + .count()]; Review comment: Isn't it the same as `configuration.getNumberOfNetworkInputs` below? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -547,302 +675,38 @@ private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator(); } - // ------------------------------------------------------------------------ - // Collectors for output chaining - // ------------------------------------------------------------------------ - /** - * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}. - * - * @param <T> The type of the elements that can be emitted. + * Wrapper class to access the chained sources and their's outputs. */ - public interface WatermarkGaugeExposingOutput<T> extends Output<T> { - Gauge<Long> getWatermarkGauge(); - } - - static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { - - protected final OneInputStreamOperator<T, ?> operator; - protected final Counter numRecordsIn; - protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); - - protected final StreamStatusProvider streamStatusProvider; - - @Nullable - protected final OutputTag<T> outputTag; - - public ChainingOutput( - OneInputStreamOperator<T, ?> operator, - StreamStatusProvider streamStatusProvider, - @Nullable OutputTag<T> outputTag) { - this.operator = operator; - - { - Counter tmpNumRecordsIn; - try { - OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup(); - tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); - } catch (Exception e) { - LOG.warn("An exception occurred during the metrics setup.", e); - tmpNumRecordsIn = new SimpleCounter(); - } - numRecordsIn = tmpNumRecordsIn; - } + public static class ChainedSourceOutputs { + private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs; + private final Map<Integer, SourceOperator<?, ?>> sourceOperators; Review comment: I think having a single map on `OperatorChain` level with values holding a pair of Output + Operator would be easier to read and more efficient (by pair I mean a class, named maybe `ChainedSourceOutputs`). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -208,19 +206,100 @@ public OperatorChain( OperatorChain( List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, RecordWriterOutput<?>[] streamOutputs, - WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint, - StreamOperatorWrapper<OUT, OP> headOperatorWrapper) { + WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput, + StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) { this.streamOutputs = checkNotNull(streamOutputs); - this.chainEntryPoint = checkNotNull(chainEntryPoint); + this.mainOperatorOutput = checkNotNull(mainOperatorOutput); this.operatorEventDispatcher = null; checkState(allOperatorWrappers != null && allOperatorWrappers.size() > 0); - this.headOperatorWrapper = checkNotNull(headOperatorWrapper); + this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper); this.tailOperatorWrapper = allOperatorWrappers.get(0); this.numOperators = allOperatorWrappers.size(); + this.chainedSourceOutputs = new ChainedSourceOutputs(); - linkOperatorWrappers(allOperatorWrappers); + firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers); + } + + private void createChainOutputs( + List<StreamEdge> outEdgesInOrder, + RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, + Map<Integer, StreamConfig> chainedConfigs, + StreamTask<OUT, OP> containingTask, + Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) { + for (int i = 0; i < outEdgesInOrder.size(); i++) { + StreamEdge outEdge = outEdgesInOrder.get(i); + + RecordWriterOutput<?> streamOutput = createStreamOutput( + recordWriterDelegate.getRecordWriter(i), + outEdge, + chainedConfigs.get(outEdge.getSourceId()), + containingTask.getEnvironment()); + + this.streamOutputs[i] = streamOutput; + streamOutputMap.put(outEdge, streamOutput); + } + } + + private ChainedSourceOutputs createChainedInputs( Review comment: Something is wrong with return type or name here :) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -547,302 +675,38 @@ private void linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorW return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator(); } - // ------------------------------------------------------------------------ - // Collectors for output chaining - // ------------------------------------------------------------------------ - /** - * An {@link Output} that measures the last emitted watermark with a {@link WatermarkGauge}. - * - * @param <T> The type of the elements that can be emitted. + * Wrapper class to access the chained sources and their's outputs. */ - public interface WatermarkGaugeExposingOutput<T> extends Output<T> { - Gauge<Long> getWatermarkGauge(); - } - - static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> { - - protected final OneInputStreamOperator<T, ?> operator; - protected final Counter numRecordsIn; - protected final WatermarkGauge watermarkGauge = new WatermarkGauge(); - - protected final StreamStatusProvider streamStatusProvider; - - @Nullable - protected final OutputTag<T> outputTag; - - public ChainingOutput( - OneInputStreamOperator<T, ?> operator, - StreamStatusProvider streamStatusProvider, - @Nullable OutputTag<T> outputTag) { - this.operator = operator; - - { - Counter tmpNumRecordsIn; - try { - OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup(); - tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter(); - } catch (Exception e) { - LOG.warn("An exception occurred during the metrics setup.", e); - tmpNumRecordsIn = new SimpleCounter(); - } - numRecordsIn = tmpNumRecordsIn; - } + public static class ChainedSourceOutputs { + private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainedSourceOutputs; + private final Map<Integer, SourceOperator<?, ?>> sourceOperators; Review comment: Map key is the index of input in `StreamConfig`, right? This seems a bit fragile and not obvious. How about keying by `Input` instances? I see we have them both on put and get. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -90,16 +91,19 @@ private final RecordWriterOutput<?>[] streamOutputs; - private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint; + private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput; /** * For iteration, {@link StreamIterationHead} and {@link StreamIterationTail} used for executing - * feedback edges do not contain any operators, in which case, {@code headOperatorWrapper} and + * feedback edges do not contain any operators, in which case, {@code mainOperatorWrapper} and * {@code tailOperatorWrapper} are null. */ - @Nullable private final StreamOperatorWrapper<OUT, OP> headOperatorWrapper; + @Nullable private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper; + @Nullable private final StreamOperatorWrapper<?, ?> firstOperatorWrapper; Review comment: Why do we need `first`? Isn't it the same as `main`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org