[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121595#comment-16121595 ]
ASF GitHub Bot commented on FLINK-6244: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r132446724 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -163,14 +251,34 @@ null, false); + final OutputTag<L> outputTag = new OutputTag<L>("dummy-timeouted", leftTypeInfo); + + final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream( + inputStream, + pattern, + clean(patternSelectFunction), + rightTypeInfo, + outputTag, + clean(patternTimeoutFunction)); + + final DataStream<L> timeoutedStream = mainStream.getSideOutput(outputTag); + TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo); - return patternStream.map( - new PatternSelectTimeoutMapper<>( - patternStream.getExecutionEnvironment().clean(patternSelectFunction), - patternStream.getExecutionEnvironment().clean(patternTimeoutFunction) - ) - ).returns(outTypeInfo); + return mainStream.connect(timeoutedStream).map(new CoMapFunction<R, L, Either<L, R>>() { --- End diff -- I think using an anonymous inner class here can be problematic because it will capture the `PatternStream` in it's closure, if I'm not mistaken. > Emit timeouted Patterns as Side Output > -------------------------------------- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.3.0 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Fix For: 1.4.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.4.14#64029)