Hi all

Any recommendation about the issue?

Regards

25 Tem 2019 Per 22:37 tarihinde Ceyhan Kasap <ceyhanka...@gmail.com> şunu
yazdı:

> Hi,
>
> I am having quite hard time to understand flink windowing principals and
> would be very pleased if you could point me in the right direction.
>
> My purpose is to count the number of recurring events for a time interval
> and generate alert events if the number of recurring events is greater than
> a threshold.
>
> As I understand, windowing is a perfect match for this scenario.
>
> Additional requirement is to generate an early alert if  recurring events
> count in a window is 2 (i.e. alert should be generated without waiting
> window end).
>
> I thought that an alert event generating process window function can be
> used to aggregate windowed events and a custom trigger can be used to emit
> early results from the window based on the recurring events count (before
> the watermark reaches the window’s end timestamp).
>
> I am using event-time semantics and having problems/questions for the
> custom trigger .
>
> You can find the actual implementation in the gist:
>
> https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36
>
>
> I am using keyed state to keep track of element count in the window
> (encounteredElementsCountState)
>
> Upon receiving first element I register EventTimeTimer to the window end.
> This is supposed to trigger FIRE_AND_PURGE for window closing and working
> as expected.
>
> If the count exceeds threshold , I try to trigger early fire. This also
> seems to be successful, processwindow function is called immediately after
> this firing.
>
> The problem is, I had to insert below check to the code without
> understanding the reason. Because the previously collected elements were
> again supplied to onElement method ...
>
>
>
>
>
> *   if (ctx.getCurrentWatermark() < 0) {
> logger.debug(String.format("onElement processing skipped for eventId : %s
> for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
>             return TriggerResult.CONTINUE;            }*
>
>
> I could not figure out  the reason. What I see is that when this happens
> the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE ( that
> leaded to the above check) . How can this happen ?
>
> This check seems to avoid duplicate early event generation, but I do not
> know why this happens and is this workaround is appropriate.
>
> Could you please advice why the same elements are processed twice in the
> window?
>
> Another question is about the keyed state usage. Does this implementation
> leaks any state after window is disposed? I am trying to clear all used
> states in clear method of the trigger but would that be enough?
>
>
> Regards
>

Reply via email to