Also - What happens to watermarks after a union operation? Do I have to assignTimestampsAndWatermarks() again? I guess I will have to since multiple streams are being combined and Flink needs to know how to resolve individual watermarks. - What is the difference between union() and connect()?
On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskal...@gmail.com> wrote: > Hi, > I want to perform some processing on events only when the watermark is > updated. Otherwise, for all other events, I want to keep buffering them > till the watermark arrives. > The main motivation behind doing this is that I have several operators > that emit events/messages to a downstream operator. Since the order in > which events arrive at the downstream operator is not guaranteed to be in > chronological event time, I want to manually sort events when the watermark > arrives and only then proceed. > > Specifically, I want to first combine multiple streams and then do the > above. Something like : > stream1.union(stream2, steream3)... > > One solution I am exploring is using a global window with a trigger that > will fire only when the watermark updates. > stream1.union(stream2, steream3). > keyBy(...). > window(GlobalWindows.create()). > trigger(new OnWatermarkUpdateTrigger()). > process(...) > > I will store the latest watermark in the trigger's state store. In the > onElement() method, I will FIRE if the current watermark is different than > the stored one. > > Is this the best way to implement the functionality described above? > >