Hi all Any recommendation about the issue?
Regards 25 Tem 2019 Per 22:37 tarihinde Ceyhan Kasap <ceyhanka...@gmail.com> şunu yazdı: > Hi, > > I am having quite hard time to understand flink windowing principals and > would be very pleased if you could point me in the right direction. > > My purpose is to count the number of recurring events for a time interval > and generate alert events if the number of recurring events is greater than > a threshold. > > As I understand, windowing is a perfect match for this scenario. > > Additional requirement is to generate an early alert if recurring events > count in a window is 2 (i.e. alert should be generated without waiting > window end). > > I thought that an alert event generating process window function can be > used to aggregate windowed events and a custom trigger can be used to emit > early results from the window based on the recurring events count (before > the watermark reaches the window’s end timestamp). > > I am using event-time semantics and having problems/questions for the > custom trigger . > > You can find the actual implementation in the gist: > > https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36 > > > I am using keyed state to keep track of element count in the window > (encounteredElementsCountState) > > Upon receiving first element I register EventTimeTimer to the window end. > This is supposed to trigger FIRE_AND_PURGE for window closing and working > as expected. > > If the count exceeds threshold , I try to trigger early fire. This also > seems to be successful, processwindow function is called immediately after > this firing. > > The problem is, I had to insert below check to the code without > understanding the reason. Because the previously collected elements were > again supplied to onElement method ... > > > > > > * if (ctx.getCurrentWatermark() < 0) { > logger.debug(String.format("onElement processing skipped for eventId : %s > for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark())); > return TriggerResult.CONTINUE; }* > > > I could not figure out the reason. What I see is that when this happens > the watermark value is (ctx.getCurrentWatermark()) Long.MIN_VALUE ( that > leaded to the above check) . How can this happen ? > > This check seems to avoid duplicate early event generation, but I do not > know why this happens and is this workaround is appropriate. > > Could you please advice why the same elements are processed twice in the > window? > > Another question is about the keyed state usage. Does this implementation > leaks any state after window is disposed? I am trying to clear all used > states in clear method of the trigger but would that be enough? > > > Regards >