Hi Oleg, With the approach with the MapState you can always fire on every incoming element :) You just iterate in the map state and find all the elements that have timestamp (key) between the timestamp of the current element (NOW) and and NOW-N.
Anyway, if Fanbin's solution works, then you can always use that! Cheers, Kostas On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь <olegpel...@gmail.com> wrote: > > Hi Kostas, > > Thanks for your reply! > Yes, you understand me correctly. However, I also want the stream to be keyed > to process it in parallel. I'm afraid the approach with MapState you > suggested doesn't really suite my use case because I need to fire on every > incoming event. > Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT > ROW" looks 100% like what I need, but I haven't tried it yet. > Also wondering if it might be expressed in DataStream API. > > ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas <kklou...@gmail.com>: >> >> Hi Oleg, >> >> Could you be more specific on what do you mean by >> "for events of last n seconds(time units in general) for every incoming >> event."? >> >> Do you mean that you have a stream of parallelism 1 and you want for >> each incoming element to have your function fire with input the event >> itself and all the events that arrived within the last N time units? >> If this is the case, you can use a dummy key to key your stream to >> have access to keyed state, then use Map State with key being the >> timestamp and value being a list of the already seen elements with >> that timestamp and whenever an element arrives, you can register a >> timer to fire N time units in the future. Then, when the timer fires, >> you can iterate over the map, fetch the elements you are interested >> in, and clean-up whatever you will not need anymore. >> >> For an example you could look at [1]. >> >> I hope this helps, >> Kostas >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >> >> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <fanbin...@coinbase.com> wrote: >> > >> > can u do >> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW? >> > >> > On Tue, Feb 11, 2020 at 12:15 PM oleg <olegpel...@gmail.com> wrote: >> >> >> >> Hi Community, >> >> >> >> I do streaming in event time and I want to preserve ordering and late >> >> events. I have a use case where I need to fire an aggregation function >> >> for events of last n seconds(time units in general) for every incoming >> >> event. >> >> >> >> It seems to me that windowing is not suitable since it may be expressed >> >> either in time or in events count, not "last n seconds for each single >> >> event". >> >> >> >> Is there an idiomatic way to do this? Any examples or help are >> >> appreciated. Thanks in advance. >> >> >> >> >> >> Best regards, >> >> >> >> Oleg Bonar >> >>