I will give it a try, my current time/watermark assigner extends AscendingTimestampExtractor so I can't override setting the watermark to the last seen event timestamp.
Thanks for your replies. /David On Tue, Oct 11, 2016 at 6:17 PM, Till Rohrmann <[email protected]> wrote: > But then no element later than the last emitted watermark must be issued > by the sources. If that is the case, then this solution should work. > > Cheers, > Till > > On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <[email protected]> wrote: > >> Hi, >> >> If you know that the events are arriving in order and a consistent lag, >> why not just increment the watermark time every time the >> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >> (or less to be conservative). >> >> You can check if the watermark has changed since the arrival of the last >> event and if not increment it in the getCurrentWatermark() method. >> Otherwise the watermark will never increase until an element arrive and if >> the stream partition stalls for some reason the whole pipeline freezes. >> >> Sameer >> >> >> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <[email protected]> >> wrote: >> >>> Hi David, >>> >>> the problem is still that there is no corresponding watermark saying >>> that 4 seconds have now passed. With your code, watermarks will be >>> periodically emitted but the same watermark will be emitted until a new >>> element arrives which will reset the watermark. Thus, the system can never >>> know until this watermark is seen whether there will be an earlier event or >>> not. I fear that this is a fundamental problem with stream processing. >>> >>> You're right that the negation operator won't solve the problem. It will >>> indeed suffer from the same problem. >>> >>> Cheers, >>> Till >>> >>> On Sun, Oct 9, 2016 at 7:37 PM, <[email protected]> wrote: >>> >>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>> "not" operator) does not address this because again, how would the "not >>>> match" be triggered if no event at all occurs? >>>> >>>> Good question. >>>> >>>> I'm not sure whether the following will work: >>>> >>>> This could be done by creating a CEP matching pattern that uses both of >>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like >>>> this: >>>> >>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first") >>>> .notNext("second") >>>> .within(Time.seconds(3)); >>>> >>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>> >>>> Note: I have requested these negation patterns to be implemented in >>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink.. >>>> >>>> >>>> - LF >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> *From:* David Koch <[email protected]> >>>> *To:* [email protected]; [email protected] >>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>> >>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>> >>>> Hello, >>>> >>>> Thank you for the explanation as well as the link to the other post. >>>> Interesting to learn about some of the open JIRAs. >>>> >>>> Indeed, I was not using event time, but processing time. However, even >>>> when using event time I only get notified of timeouts upon subsequent >>>> events. >>>> >>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I >>>> read <key> <value> from a socket, wrap this in a custom "event" with >>>> timestamp, key the resultant stream by <key> and attempt to detect <key> >>>> instances no further than 3 seconds apart using CEP. >>>> >>>> Apart from the fact that results are only printed when I close the >>>> socket (normal?) I don't observe any change in behaviour >>>> >>>> So event-time/watermarks or not: SOME event has to occur for the >>>> timeout to be triggered. >>>> >>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>> "not" operator) does not address this because again, how would the "not >>>> match" be triggered if no event at all occurs? >>>> >>>> On Sat, Oct 8, 2016 at 12:50 AM, <[email protected]> wrote: >>>> >>>> The following is a better link: >>>> >>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g% >>>> 40mail.gmail.com%3E >>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E> >>>> >>>> >>>> - LF >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> *From:* "[email protected]" <[email protected]> >>>> *To:* "[email protected]" <[email protected]> >>>> *Sent:* Friday, October 7, 2016 3:36 PM >>>> >>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>> >>>> Isn't the upcoming CEP negation (absence of an event) feature solve >>>> this issue? >>>> >>>> See this discussion thread: >>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX >>>> 9Fg%40mail.gmail.com%3E >>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E> >>>> >>>> >>>> >>>> // Atul >>>> >>>> >>>> ------------------------------ >>>> *From:* Till Rohrmann <[email protected]> >>>> *To:* [email protected] >>>> *Sent:* Friday, October 7, 2016 12:58 AM >>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>> >>>> Hi David, >>>> >>>> in case of event time, the timeout will be detected when the first >>>> watermark exceeding the timeout value is received. Thus, it depends a >>>> little bit how you generate watermarks (e.g. periodically, watermark per >>>> event). >>>> >>>> In case of processing time, the time is only updated whenever a new >>>> element arrives. Thus, if you have an element arriving 4 seconds after >>>> Event A, it should detect the timeout. If the next event arrives 20 seconds >>>> later, than you won't see the timeout until then. >>>> >>>> In the case of processing time, we could think about registering >>>> timeout timers for processing time. However, I would highly recommend you >>>> to use event time, because with processing time, Flink cannot guarantee >>>> meaningful computations, because the events might arrive out of order. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <[email protected]> >>>> wrote: >>>> >>>> Hello, >>>> >>>> With Flink CEP, is there a way to actively listen to pattern matches >>>> that time out? I am under the impression that this is not possible. >>>> >>>> In my case I partition a stream containing user web navigation by >>>> "userId" to look for sequences of Event A, followed by B within 4 seconds >>>> for each user. >>>> >>>> I registered a PatternTimeoutFunction which assuming a non-match only >>>> fires upon the first event after the specified timeout. For example, given >>>> user X: Event A, 20 seconds later Event B (or any other type of event). >>>> >>>> I'd rather have a notification fire directly upon the 4 second interval >>>> expiring since passive invalidation is not really applicable in my case. >>>> >>>> How, if at all can this be achieved with Flink CEP? >>>> >>>> Thanks, >>>> >>>> David >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >> >
