Hi Manas,

first of all, after assigning watermarks at the source level, usually Flink operators make sure to handle the watermarks.

In case of a `union()`, the subsequent operator will increment its internal event-time clock and emit a new watermark only if all input streams (and their parallel instances) have reached a common event-time.

Your sorting use case can be easily done with a KeyedProcessFunction [1]. You can buffer your events in a list state, and process them when a timer fires. The documentation also explains how to set a timer.

If you want to fire when the next watermark arrives, you can set a timer like:

ctx.timerService().currentWatermark() + 1

The `union()` is meant for combining streams of the same data into one where the order of the event does not matter. However, watermarks are still arriving in order so a sorting by event-time should not be a problem.

connect() is broader than a join (see also the answer here [2]).

I hope I could answer most of your questions. Feel free to ask further questions.

Regards,
Timo


[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function [2] https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect



On 02.04.20 12:11, Manas Kale wrote:
Also

  *   What happens to watermarks after a union operation? Do I have to
    assignTimestampsAndWatermarks() again? I guess I will have to since
    multiple streams are being combined and Flink needs to know how to
    resolve individual watermarks.
  * What is the difference between union() and connect()?


On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskal...@gmail.com <mailto:manaskal...@gmail.com>> wrote:

    Hi,
    I want to perform some processing on events only when the watermark
    is updated. Otherwise, for all other events, I want to keep
    buffering them till the watermark arrives.
    The main motivation behind doing this is that I have several
    operators that emit events/messages to a downstream operator. Since
    the order in which events arrive at the downstream operator is not
    guaranteed to be in chronological event time, I want to manually
    sort events when the watermark arrives and only then proceed.

    Specifically, I want to first combine multiple streams and then do
    the above. Something like :
    stream1.union(stream2, steream3)...

    One solution I am exploring is using a global window with a trigger
    that will fire only when the watermark updates.
    stream1.union(stream2, steream3).
    keyBy(...).
    window(GlobalWindows.create()).
    trigger(new OnWatermarkUpdateTrigger()).
    process(...)

    I will store the latest watermark in the trigger's state store. In
    the onElement() method, I will FIRE if the current watermark is
    different than the stored one.

    Is this the best way to implement the functionality described above?


Reply via email to