Thank you very much, that was really helpful

Cheers,
Federico

2017-11-08 13:51 GMT+01:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Unforunately there is mistake in the docs the return type should be
> DataStream rather than SingleOuputStream
>
> The correct version should be:
>
> val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
>
> val outputTag = OutputTag[String]("side-output")
>
> val result: DataStream[ComplexEvent] = patternStream.select(outputTag){
>         (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> } {
>         pattern: Map[String, Iterable[Event]] => ComplexEvent()
> }
>
> This syntax is only available in 1.4 though, in previous versions
> timeouted events were not returned via sideOutput.
>
>
>
> > On 8 Nov 2017, at 12:18, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
> >
> > Thank you very much, Dawid, for your thorough explanation, really
> useful. I totally missed the distinction between timed-out events and
> complete matches.
> >
> > I'd like to ask you one more thing, about the flinkCEP scala api: in the
> documentation, there is the following code:
> >
> > val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
> >
> >
> >
> > val outputTag = OutputTag[String]("side-output")
> >
> >
> >
> > val result: SingleOutputStreamOperator[ComplexEvent] =
> patternStream.select(outputTag){
> >
> >
> > (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
> TimeoutEvent()
> > } {
> >
> >
> > pattern: Map[String, Iterable[Event]] => ComplexEvent()
> > }
> >
> > where result would then be used to get outputtag side output.
> > If I paste this code I get that the select function is missing its
> parameters ("Unspecified value parameters: patternSelectFunction:
> PatternSelectFunction[ComplexEvent, NotInferredR]""),
> > while, If I add the parameters explicitly such as
> >
> > patternStream.select[TimeoutEvent, ComplexEvent]
> >
> > I get "Too many arguments for select". Am I missing something?
> >
> > Thank you very much,
> > Federico
> >
> > 2017-11-07 16:34 GMT+01:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com
> >:
> > Hi Federico,
> >
> > For your given input and pattern there should (and there are) only two
> timeouted patterns:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
> >
> > It is because in your patterns say the next event after events with
> value >=100 should not have value >= 100 . And within your timeout there is
> no sequence of events where (>=100)+ (<100).
> >
> > But I will try to explain how it works with the same input for Pattern:
> >
> > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > .notNext("end").where(_.value <100).within(Time.minutes(30))
> >
> > Then we have matches:
> >
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02))))
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02))))
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02))))
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02))))
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02))))
> > 5> Right(Map(start -> List(Event(100,2017-11-05T03:54:02))))
> >
> > and timeouted partial matches:
> >
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:50:02),
> Event(100,2017-11-05T03:52:02), Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02))))
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:52:02),
> Event(100,2017-11-05T03:54:02), Event(100,2017-11-05T03:56:02))))
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:54:02),
> Event(100,2017-11-05T03:56:02))))
> > 5> Left(Map(start -> List(Event(100,2017-11-05T03:56:02))))
> > 5> Left(Map(start -> List(Event(100,2017-11-05T06:00:02))))
> >
> > Right now (in flink 1.3.2) pattern can start on each event (in 1.4 you
> will be able to specify AFTER_MATCH_SKIP strategy see:
> https://issues.apache.org/jira/browse/FLINK-7169), therefore you see
> matches starting at 2017-11-05T03:50:02, 2017-11-05T03:52:02,
> 2017-11-05T03:54:02.
> > Also right now the oneOrMore is not greedy (in 1.4 you will be able to
> alter it see: https://issues.apache.org/jira/browse/FLINK-7147),
> therefore you see matches like: List(Event(100,2017-11-05T03:50:02)) and
> List(Event(100,2017-11-05T03:50:02), Event(100,2017-11-05T03:52:02))
> rather than only one of those.
> >
> > The timeoute partial matches are returned because within the timeout
> there was no event with value <100 (in fact there was no event at all to be
> checked).
> >
> > Hope this "study" helps you understand the behaviour. If you feel I
> missed something, please provide some example I could reproduce.
> >
> > Regards,
> > Dawid
> >
> > 2017-11-07 11:29 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > Hey Frederico,
> >
> > let me pull in Dawid (cc'd) who works on CEP. He can probably clarify
> > the expected behaviour here.
> >
> > Best,
> >
> > Ufuk
> >
> >
> > On Mon, Nov 6, 2017 at 12:06 PM, Federico D'Ambrosio
> > <federico.dambro...@smartlab.ws> wrote:
> > > Hi everyone,
> > >
> > > I wanted to ask if FlinkCEP in the following scenario is working as it
> > > should, or I have misunderstood its functioning.
> > >
> > > I've got a keyedstream associated with the following pattern:
> > >
> > > Pattern[Event].begin("start").where(_.value >=100).oneOrMore
> > > .notNext("end").where(_.value >=100).within(Time.minutes(30))
> > >
> > > Considering a single key in the stream, for simplicity, I've got the
> > > following sequence of events (using EventTime on the "time" field of
> the
> > > json event):
> > >
> > > {value: 100, time: "2017-11-05 03:50:02.000"}
> > > {value: 100, time: "2017-11-05 03:52:02.000"}
> > > {value: 100, time: "2017-11-05 03:54:02.000"}
> > > {value: 100, time: "2017-11-05 03:56:02.000"} // end of events within
> the 30
> > > minutes from the first event
> > > {value: 100, time: "2017-11-05 06:00:02.000"}
> > >
> > > Now, when it comes to the select/flatselect function, I tried printing
> the
> > > content of the pattern map and what I noticed is that, for example, the
> > > first 2 events weren't considered in the same pattern as the map was
> like
> > > the following:
> > >
> > > {start=[{value: 100, time: 2017-11-05 03:50:02.000}]}
> > > {start=[{value: 100, time: 2017-11-05 03:52:02.000}]}
> > >
> > > Now, shouldn't they be in the same List, as they belong to the same
> > > iterative pattern, defined with the oneOrMore clause?
> > >
> > > Thank you for your insight,
> > > Federico D'Ambrosio
> >
> >
> >
> >
> > --
> > Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio

Reply via email to