Hi Dominik,

you can control FlinkCEP's consumption behaviour via the after match skip
strategies [1]. They allow you to control how Flink treats events after a
match has occurred.

If you are interested in the longest possible window of events exceeding
your threshold, then you could also add terminating event which is below
the threshold. Only then you can be sure that any following event won't
continue the window.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#after-match-skip-strategy

Cheers,
Till

On Fri, Feb 21, 2020 at 10:56 AM Dominik Wosiński <wos...@gmail.com> wrote:

> Hey,
> I have a question regarding CEP, assume I have a stream of readings from
> various sensors. The application is running in EventTime, so according to
> the CEP docs the events are buffered and sorted by timestamp ascending.
>
> So, I want to record the situations when reading from the sensor goes above
> some threshold. But what I am interested in is to have a whole match for
> the period when the event was above the threshold.
>
> I tried to implement a single pattern that was more or less something:
>
>
> Pattern.begin[Reading]("beginning")
>   .where(_.data() <  Threshold)
>
>   .oneOrMore
>
>   .greedy
>
>   .consecutive
>
>
>
> But now it produces multiple partial matches that I can't eliminate. For
> example for threshold = 350, I have a stream:
>
> 300, 400, 500, 300
>
> And then I get the following lists of events [400], [400, 500], [500].
>
> Is there a way to eliminate those partial matches ??
>
> Best Regards,
> Dom.
>

Reply via email to