[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925920#comment-15925920 ]
ASF GitHub Bot commented on FLINK-4460: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139530 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput<T> extends ChainingOutput<T> { private final TypeSerializer<T> serializer; - + public CopyingChainingOutput( OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer, + OutputTag<T> outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord<T> record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override --- End diff -- This can become `private`, as before. > Side Outputs in Flink > --------------------- > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Chen Qin > Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)