Hi Timo, Thanks for the information. On Thu, Apr 2, 2020 at 9:30 PM Timo Walther <twal...@apache.org> wrote:
> Hi Manas, > > first of all, after assigning watermarks at the source level, usually > Flink operators make sure to handle the watermarks. > > In case of a `union()`, the subsequent operator will increment its > internal event-time clock and emit a new watermark only if all input > streams (and their parallel instances) have reached a common event-time. > > Your sorting use case can be easily done with a KeyedProcessFunction > [1]. You can buffer your events in a list state, and process them when a > timer fires. The documentation also explains how to set a timer. > > If you want to fire when the next watermark arrives, you can set a timer > like: > > ctx.timerService().currentWatermark() + 1 > > The `union()` is meant for combining streams of the same data into one > where the order of the event does not matter. However, watermarks are > still arriving in order so a sorting by event-time should not be a problem. > > connect() is broader than a join (see also the answer here [2]). > > I hope I could answer most of your questions. Feel free to ask further > questions. > > Regards, > Timo > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function > [2] > > https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect > > > > On 02.04.20 12:11, Manas Kale wrote: > > Also > > > > * What happens to watermarks after a union operation? Do I have to > > assignTimestampsAndWatermarks() again? I guess I will have to since > > multiple streams are being combined and Flink needs to know how to > > resolve individual watermarks. > > * What is the difference between union() and connect()? > > > > > > On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskal...@gmail.com > > <mailto:manaskal...@gmail.com>> wrote: > > > > Hi, > > I want to perform some processing on events only when the watermark > > is updated. Otherwise, for all other events, I want to keep > > buffering them till the watermark arrives. > > The main motivation behind doing this is that I have several > > operators that emit events/messages to a downstream operator. Since > > the order in which events arrive at the downstream operator is not > > guaranteed to be in chronological event time, I want to manually > > sort events when the watermark arrives and only then proceed. > > > > Specifically, I want to first combine multiple streams and then do > > the above. Something like : > > stream1.union(stream2, steream3)... > > > > One solution I am exploring is using a global window with a trigger > > that will fire only when the watermark updates. > > stream1.union(stream2, steream3). > > keyBy(...). > > window(GlobalWindows.create()). > > trigger(new OnWatermarkUpdateTrigger()). > > process(...) > > > > I will store the latest watermark in the trigger's state store. In > > the onElement() method, I will FIRE if the current watermark is > > different than the stored one. > > > > Is this the best way to implement the functionality described above? > > > >