Hi,

If you know that the events are arriving in order and a consistent lag, why
not just increment the watermark time every time the getCurrentWatermark()
method is invoked based on the autoWatermarkInterval (or less to be
conservative).

You can check if the watermark has changed since the arrival of the last
event and if not increment it in the getCurrentWatermark() method.
Otherwise the watermark will never increase until an element arrive and if
the stream partition stalls for some reason the whole pipeline freezes.

Sameer


On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrm...@gmail.com>
wrote:

> Hi David,
>
> the problem is still that there is no corresponding watermark saying that
> 4 seconds have now passed. With your code, watermarks will be periodically
> emitted but the same watermark will be emitted until a new element arrives
> which will reset the watermark. Thus, the system can never know until this
> watermark is seen whether there will be an earlier event or not. I fear
> that this is a fundamental problem with stream processing.
>
> You're right that the negation operator won't solve the problem. It will
> indeed suffer from the same problem.
>
> Cheers,
> Till
>
> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>
>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>> "not" operator) does not address this because again, how would the "not
>> match" be triggered if no event at all occurs?
>>
>> Good question.
>>
>> I'm not sure whether the following will work:
>>
>> This could be done by creating a CEP matching pattern that uses both of
>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>
>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>     .notNext("second")
>>     .within(Time.seconds(3));
>>
>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>
>> Note: I have requested these negation patterns to be implemented in Flink
>> CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>
>>
>> - LF
>>
>>
>>
>>
>> ------------------------------
>> *From:* David Koch <ogd...@googlemail.com>
>> *To:* user@flink.apache.org; lg...@yahoo.com
>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hello,
>>
>> Thank you for the explanation as well as the link to the other post.
>> Interesting to learn about some of the open JIRAs.
>>
>> Indeed, I was not using event time, but processing time. However, even
>> when using event time I only get notified of timeouts upon subsequent
>> events.
>>
>> The link <http://pastebin.com/x4m3RHQz> contains an example where I read
>> <key> <value> from a socket, wrap this in a custom "event" with timestamp,
>> key the resultant stream by <key> and attempt to detect <key> instances no
>> further than 3 seconds apart using CEP.
>>
>> Apart from the fact that results are only printed when I close the socket
>> (normal?) I don't observe any change in behaviour
>>
>> So event-time/watermarks or not: SOME event has to occur for the timeout
>> to be triggered.
>>
>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP "not"
>> operator) does not address this because again, how would the "not match" be
>> triggered if no event at all occurs?
>>
>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>
>> The following is a better link:
>>
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>> 40mail.gmail.com%3E
>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>
>>
>> - LF
>>
>>
>>
>>
>> ------------------------------
>> *From:* "lg...@yahoo.com" <lg...@yahoo.com>
>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>> *Sent:* Friday, October 7, 2016 3:36 PM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Isn't the upcoming CEP negation (absence of an event) feature solve this
>> issue?
>>
>> See this discussion thread:
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>> 9Fg%40mail.gmail.com%3E
>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>
>>
>>
>> //  Atul
>>
>>
>> ------------------------------
>> *From:* Till Rohrmann <trohrm...@apache.org>
>> *To:* user@flink.apache.org
>> *Sent:* Friday, October 7, 2016 12:58 AM
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hi David,
>>
>> in case of event time, the timeout will be detected when the first
>> watermark exceeding the timeout value is received. Thus, it depends a
>> little bit how you generate watermarks (e.g. periodically, watermark per
>> event).
>>
>> In case of processing time, the time is only updated whenever a new
>> element arrives. Thus, if you have an element arriving 4 seconds after
>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>> later, than you won't see the timeout until then.
>>
>> In the case of processing time, we could think about registering timeout
>> timers for processing time. However, I would highly recommend you to use
>> event time, because with processing time, Flink cannot guarantee meaningful
>> computations, because the events might arrive out of order.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com> wrote:
>>
>> Hello,
>>
>> With Flink CEP, is there a way to actively listen to pattern matches that
>> time out? I am under the impression that this is not possible.
>>
>> In my case I partition a stream containing user web navigation by
>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>> for each user.
>>
>> I registered a PatternTimeoutFunction which assuming a non-match only
>> fires upon the first event after the specified timeout. For example, given
>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>
>> I'd rather have a notification fire directly upon the 4 second interval
>> expiring since passive invalidation is not really applicable in my case.
>>
>> How, if at all can this be achieved with Flink CEP?
>>
>> Thanks,
>>
>> David
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to