[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121595#comment-16121595
 ] 

ASF GitHub Bot commented on FLINK-6244:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4320#discussion_r132446724
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
    @@ -163,14 +251,34 @@
                        null,
                        false);
     
    +           final OutputTag<L> outputTag = new 
OutputTag<L>("dummy-timeouted", leftTypeInfo);
    +
    +           final SingleOutputStreamOperator<R> mainStream = 
CEPOperatorUtils.createTimeoutPatternStream(
    +                   inputStream,
    +                   pattern,
    +                   clean(patternSelectFunction),
    +                   rightTypeInfo,
    +                   outputTag,
    +                   clean(patternTimeoutFunction));
    +
    +           final DataStream<L> timeoutedStream = 
mainStream.getSideOutput(outputTag);
    +
                TypeInformation<Either<L, R>> outTypeInfo = new 
EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
     
    -           return patternStream.map(
    -                   new PatternSelectTimeoutMapper<>(
    -                           
patternStream.getExecutionEnvironment().clean(patternSelectFunction),
    -                           
patternStream.getExecutionEnvironment().clean(patternTimeoutFunction)
    -                   )
    -           ).returns(outTypeInfo);
    +           return mainStream.connect(timeoutedStream).map(new 
CoMapFunction<R, L, Either<L, R>>() {
    --- End diff --
    
    I think using an anonymous inner class here can be problematic because it 
will capture the `PatternStream` in it's closure, if I'm not mistaken.


> Emit timeouted Patterns as Side Output
> --------------------------------------
>
>                 Key: FLINK-6244
>                 URL: https://issues.apache.org/jira/browse/FLINK-6244
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>             Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to