[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925923#comment-15925923 ]
ASF GitHub Bot commented on FLINK-4460: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139311 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -326,33 +327,46 @@ public int getChainLength() { Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, - List<StreamOperator<?>> allOperators) + List<StreamOperator<?>> allOperators, + OutputTag<IN> outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output<StreamRecord<OUT>> output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private <T> RecordWriterOutput<T> createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - String taskName) - { - TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + + if (edge.getOutputTag() != null) { + // side output + outSerializer = upStreamConfig.getTypeSerializerSideOut( + edge.getOutputTag(), taskEnvironment.getUserClassLoader()); + } else { + // main output --- End diff -- this can become one line. > Side Outputs in Flink > --------------------- > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Chen Qin > Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)