[ https://issues.apache.org/jira/browse/FLINK-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935323#comment-16935323 ]
Congxian Qiu(klion26) commented on FLINK-13148: ----------------------------------------------- Hi, [~kkl0u] do you think we can have this feature into 1.10? > Expose WindowedStream.sideOutputLateData() from CoGroupedStreams > ---------------------------------------------------------------- > > Key: FLINK-13148 > URL: https://issues.apache.org/jira/browse/FLINK-13148 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.9.0 > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Major > > As FLINK-10050 supported {{alloedLateness}}, but we can not get the side > output containing the late data, this issue wants to fix it. > For implementation, I want to add an input parameter {{OutputTag}} in > {{WithWindow}} as following > {code:java} > protected WithWindow(DataStream<T1> input1, > DataStream<T2> input2, > KeySelector<T1, KEY> keySelector1, > KeySelector<T2, KEY> keySelector2, > TypeInformation<KEY> keyType, > WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, > Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, > Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, > Time allowedLateness, > OutputTage<TaggedUnion<T1, T2>> outputTag) { > ... > } > {code} > and add a function sideOutputLateData(OutputTag<T> outputTag) in > {{WithWindow}} > {code:java} > public WithWindow<T1, T2, KEY, W> > sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) { > ... > } > {code} > In {{WithWindow#apply}} will add outputTag if it is not null > {code:java} > public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, > TypeInfomation<T> resultType) { > ... > if (outputTag != null) { > windowedStream.sideOutputLateData(outputTag); > } > ... > }{code} > The same will apply to {{JoinedStreams}} -- This message was sent by Atlassian Jira (v8.3.4#803005)