Hi Till, Excellent - I'll check out the current snapshot version! Thank you for taking the time to look into this.
Regards, David On Tue, Nov 8, 2016 at 3:25 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi David, > > sorry for my late reply. I just found time to look into the problem. You > were right with your observation that the CEP operator did not behave as > I've described it. The problem was that the time of the underlying NFA was > not advanced if there were no events buffered in the CEP operator when a > new watermark arrived. This was not intended and I opened a PR [1] to fix > this problem. I've tested the fix with your example program and it seems to > solve the problem that you don't see timeouts after the timeout interval > has passed. Thanks for reporting this problem and please excuse my long > response time. > > Btw, I'll merge the PR this evening. So it should be included in the > current snapshot version by the end of tomorrow. > > [1] https://github.com/apache/flink/pull/2771 > > Cheers, > Till > > On Fri, Oct 14, 2016 at 11:40 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi guys, >> >> I'll try to come up with an example illustrating the behaviour over the >> weekend. >> >> Cheers, >> Till >> >> On Fri, Oct 14, 2016 at 11:16 AM, David Koch <ogd...@googlemail.com> >> wrote: >> >>> Hello, >>> >>> Thanks for the code Sameer. Unfortunately, it didn't solve the issue. >>> Compared to what I did the principle is the same - make sure that the >>> watermark advances even without events present to trigger timeouts in CEP >>> patterns. >>> >>> If Till or anyone else could provide a minimal example illustrating the >>> supposed behaviour of: >>> >>> [CEP] timeout will be detected when the first watermark exceeding the >>>> timeout value is received >>> >>> >>> I'd very much appreciate it. >>> >>> Regards, >>> >>> David >>> >>> >>> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sam...@axiomine.com> wrote: >>> >>>> Try this. Your WM's need to move forward. Also don't use System >>>> Timestamp. Use the timestamp of the element seen as the reference as the >>>> elements are most likely lagging the system timestamp. >>>> >>>> DataStream<Event> withTimestampsAndWatermarks = tuples >>>> .assignTimestampsAndWatermarks(new >>>> AssignerWithPeriodicWatermarks<Event>() { >>>> >>>> long waterMarkTmst; >>>> long lastEmittedWM=0; >>>> @Override >>>> public long extractTimestamp(Event element, long >>>> previousElementTimestamp) { >>>> if(element.tmst>lastEmittedWM){ >>>> waterMarkTmst = element.tmst-1; //Assumes >>>> increasing timestamps. Need to subtract 1 as more elements with same TS >>>> might arrive >>>> } >>>> return element.tmst; >>>> } >>>> >>>> @Override >>>> public Watermark getCurrentWatermark() { >>>> if(lastEmittedWM==waterMarkTmst){ //No new event seen, >>>> move the WM forward by auto watermark interval >>>> waterMarkTmst = waterMarkTmst + 1000l//Increase by >>>> auto watermark interval (Watermarks only move forward in time) >>>> } >>>> lastEmittedWM = waterMarkTmst >>>> >>>> System.out.println(String.format("Watermark at %s", >>>> new Date(waterMarkTmst))); >>>> return new Watermark(waterMarkTmst);//Until an event >>>> is seem WM==0 starts advancing by 1000ms until an event is seen >>>> } >>>> }).keyBy("key"); >>>> >>>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogd...@googlemail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I tried setting the watermark to System.currentTimeMillis() - 5000L, >>>>> event timestamps are System.currentTimeMillis(). I do not observe the >>>>> expected behaviour of the PatternTimeoutFunction firing once the watermark >>>>> moves past the timeout "anchored" by a pattern match. >>>>> >>>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>, >>>>> in case someone is interested. The timestamp/watermark assigner looks like >>>>> this: >>>>> >>>>> DataStream<Event> withTimestampsAndWatermarks = tuples >>>>> .assignTimestampsAndWatermarks(new >>>>> AssignerWithPeriodicWatermarks<Event>() { >>>>> >>>>> long waterMarkTmst; >>>>> >>>>> @Override >>>>> public long extractTimestamp(Event element, long >>>>> previousElementTimestamp) { >>>>> return element.tmst; >>>>> } >>>>> >>>>> @Override >>>>> public Watermark getCurrentWatermark() { >>>>> waterMarkTmst = System.currentTimeMillis() - 5000L; >>>>> System.out.println(String.format("Watermark at %s", >>>>> new Date(waterMarkTmst))); >>>>> return new Watermark(waterMarkTmst); >>>>> } >>>>> }).keyBy("key"); >>>>> >>>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate >>>>> rmarkInterval(1000L); >>>>> >>>>> // Apply pattern filtering on stream. >>>>> PatternStream<Event> patternStream = >>>>> CEP.pattern(withTimestampsAndWatermarks, >>>>> pattern); >>>>> >>>>> Any idea what's wrong? >>>>> >>>>> David >>>>> >>>>> >>>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> >>>>> wrote: >>>>> >>>>>> Assuming an element with timestamp which is later than the last >>>>>> emitted watermark arrives, would it just be dropped because the >>>>>> PatternStream does not have a max allowed lateness method? In that case >>>>>> it >>>>>> appears that CEP cannot handle late events yet out of the box. >>>>>> >>>>>> If we do want to support late events can we chain a >>>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() >>>>>> again before handing it to the CEP operator. This way we may have the >>>>>> patterns fired multiple times but it allows an event to be late and out >>>>>> of >>>>>> order. It looks like it will work but is there a less convoluted way. >>>>>> >>>>>> Thanks, >>>>>> Sameer >>>>>> >>>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann < >>>>>> till.rohrm...@gmail.com> wrote: >>>>>> >>>>>>> But then no element later than the last emitted watermark must be >>>>>>> issued by the sources. If that is the case, then this solution should >>>>>>> work. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> If you know that the events are arriving in order and a consistent >>>>>>>> lag, why not just increment the watermark time every time the >>>>>>>> getCurrentWatermark() method is invoked based on the >>>>>>>> autoWatermarkInterval >>>>>>>> (or less to be conservative). >>>>>>>> >>>>>>>> You can check if the watermark has changed since the arrival of the >>>>>>>> last event and if not increment it in the getCurrentWatermark() method. >>>>>>>> Otherwise the watermark will never increase until an element arrive >>>>>>>> and if >>>>>>>> the stream partition stalls for some reason the whole pipeline freezes. >>>>>>>> >>>>>>>> Sameer >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann < >>>>>>>> till.rohrm...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> the problem is still that there is no corresponding watermark >>>>>>>>> saying that 4 seconds have now passed. With your code, watermarks >>>>>>>>> will be >>>>>>>>> periodically emitted but the same watermark will be emitted until a >>>>>>>>> new >>>>>>>>> element arrives which will reset the watermark. Thus, the system can >>>>>>>>> never >>>>>>>>> know until this watermark is seen whether there will be an earlier >>>>>>>>> event or >>>>>>>>> not. I fear that this is a fundamental problem with stream processing. >>>>>>>>> >>>>>>>>> You're right that the negation operator won't solve the problem. >>>>>>>>> It will indeed suffer from the same problem. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote: >>>>>>>>> >>>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>>>>>>> "not" operator) does not address this because again, how would the >>>>>>>>>> "not >>>>>>>>>> match" be triggered if no event at all occurs? >>>>>>>>>> >>>>>>>>>> Good question. >>>>>>>>>> >>>>>>>>>> I'm not sure whether the following will work: >>>>>>>>>> >>>>>>>>>> This could be done by creating a CEP matching pattern that uses >>>>>>>>>> both of "notNext" (or "notFollowedBy") and "within" constructs. >>>>>>>>>> Something >>>>>>>>>> like this: >>>>>>>>>> >>>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first") >>>>>>>>>> .notNext("second") >>>>>>>>>> .within(Time.seconds(3)); >>>>>>>>>> >>>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>>>>>>>> >>>>>>>>>> Note: I have requested these negation patterns to be implemented >>>>>>>>>> in Flink CEP, but notNext/notFollowedBy are not yet implemented in >>>>>>>>>> Flink.. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - LF >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ------------------------------ >>>>>>>>>> *From:* David Koch <ogd...@googlemail.com> >>>>>>>>>> *To:* user@flink.apache.org; lg...@yahoo.com >>>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>>>>>>>> >>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> Thank you for the explanation as well as the link to the other >>>>>>>>>> post. Interesting to learn about some of the open JIRAs. >>>>>>>>>> >>>>>>>>>> Indeed, I was not using event time, but processing time. However, >>>>>>>>>> even when using event time I only get notified of timeouts upon >>>>>>>>>> subsequent >>>>>>>>>> events. >>>>>>>>>> >>>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example >>>>>>>>>> where I read <key> <value> from a socket, wrap this in a custom >>>>>>>>>> "event" >>>>>>>>>> with timestamp, key the resultant stream by <key> and attempt to >>>>>>>>>> detect >>>>>>>>>> <key> instances no further than 3 seconds apart using CEP. >>>>>>>>>> >>>>>>>>>> Apart from the fact that results are only printed when I close >>>>>>>>>> the socket (normal?) I don't observe any change in behaviour >>>>>>>>>> >>>>>>>>>> So event-time/watermarks or not: SOME event has to occur for the >>>>>>>>>> timeout to be triggered. >>>>>>>>>> >>>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>>>>>>> "not" operator) does not address this because again, how would the >>>>>>>>>> "not >>>>>>>>>> match" be triggered if no event at all occurs? >>>>>>>>>> >>>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote: >>>>>>>>>> >>>>>>>>>> The following is a better link: >>>>>>>>>> >>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- >>>>>>>>>> DkoGfVC4UAWD6uQwwRgTsE5be8g% >>>>>>>>>> 40mail.gmail.com%3E >>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - LF >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ------------------------------ >>>>>>>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com> >>>>>>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org> >>>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM >>>>>>>>>> >>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>>>>> >>>>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature >>>>>>>>>> solve this issue? >>>>>>>>>> >>>>>>>>>> See this discussion thread: >>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX >>>>>>>>>> 9Fg%40mail.gmail.com%3E >>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> // Atul >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ------------------------------ >>>>>>>>>> *From:* Till Rohrmann <trohrm...@apache.org> >>>>>>>>>> *To:* user@flink.apache.org >>>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM >>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>> in case of event time, the timeout will be detected when the >>>>>>>>>> first watermark exceeding the timeout value is received. Thus, it >>>>>>>>>> depends a >>>>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark >>>>>>>>>> per >>>>>>>>>> event). >>>>>>>>>> >>>>>>>>>> In case of processing time, the time is only updated whenever a >>>>>>>>>> new element arrives. Thus, if you have an element arriving 4 seconds >>>>>>>>>> after >>>>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 >>>>>>>>>> seconds >>>>>>>>>> later, than you won't see the timeout until then. >>>>>>>>>> >>>>>>>>>> In the case of processing time, we could think about registering >>>>>>>>>> timeout timers for processing time. However, I would highly >>>>>>>>>> recommend you >>>>>>>>>> to use event time, because with processing time, Flink cannot >>>>>>>>>> guarantee >>>>>>>>>> meaningful computations, because the events might arrive out of >>>>>>>>>> order. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> With Flink CEP, is there a way to actively listen to pattern >>>>>>>>>> matches that time out? I am under the impression that this is not >>>>>>>>>> possible. >>>>>>>>>> >>>>>>>>>> In my case I partition a stream containing user web navigation by >>>>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 >>>>>>>>>> seconds >>>>>>>>>> for each user. >>>>>>>>>> >>>>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match >>>>>>>>>> only fires upon the first event after the specified timeout. For >>>>>>>>>> example, >>>>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of >>>>>>>>>> event). >>>>>>>>>> >>>>>>>>>> I'd rather have a notification fire directly upon the 4 second >>>>>>>>>> interval expiring since passive invalidation is not really >>>>>>>>>> applicable in my >>>>>>>>>> case. >>>>>>>>>> >>>>>>>>>> How, if at all can this be achieved with Flink CEP? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> David >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >