Hi,

I am having quite hard time to understand flink windowing principals and
would be very pleased if you could point me in the right direction.

My purpose is to count the number of recurring events for a time interval
and generate alert events if the number of recurring events is greater than
a threshold.

As I understand, windowing is a perfect match for this scenario.

Additional requirement is to generate an early alert if  recurring events
count in a window is 2 (i.e. alert should be generated without waiting
window end).

I thought that an alert event generating process window function can be
used to aggregate windowed events and a custom trigger can be used to emit
early results from the window based on the recurring events count (before
the watermark reaches the window’s end timestamp).

I am using event-time semantics and having problems/questions for the
custom trigger .

You can find the actual implementation in the gist:

https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36


I am using keyed state to keep track of element count in the window
(encounteredElementsCountState)

Upon receiving first element I register EventTimeTimer to the window end.
This is supposed to trigger FIRE_AND_PURGE for window closing and working
as expected.

If the count exceeds threshold , I try to trigger early fire. This also
seems to be successful, processwindow function is called immediately after
this firing.

The problem is, I had to insert below check to the code without
understanding the reason. Because the previously collected elements were
again supplied to onElement method ...





*   if (ctx.getCurrentWatermark() < 0) {
logger.debug(String.format("onElement processing skipped for eventId : %s
for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
            return TriggerResult.CONTINUE;            }*


I could not figure out  the reason. What I see is that when this happens
the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE ( that
leaded to the above check) . How can this happen ?

This check seems to avoid duplicate early event generation, but I do not
know why this happens and is this workaround is appropriate.

Could you please advice why the same elements are processed twice in the
window?

Another question is about the keyed state usage. Does this implementation
leaks any state after window is disposed? I am trying to clear all used
states in clear method of the trigger but would that be enough?


Regards

Reply via email to