Hi Manas,
first of all, after assigning watermarks at the source level, usually
Flink operators make sure to handle the watermarks.
In case of a `union()`, the subsequent operator will increment its
internal event-time clock and emit a new watermark only if all input
streams (and their parallel instances) have reached a common event-time.
Your sorting use case can be easily done with a KeyedProcessFunction
[1]. You can buffer your events in a list state, and process them when a
timer fires. The documentation also explains how to set a timer.
If you want to fire when the next watermark arrives, you can set a timer
like:
ctx.timerService().currentWatermark() + 1
The `union()` is meant for combining streams of the same data into one
where the order of the event does not matter. However, watermarks are
still arriving in order so a sorting by event-time should not be a problem.
connect() is broader than a join (see also the answer here [2]).
I hope I could answer most of your questions. Feel free to ask further
questions.
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#process-function
[2]
https://stackoverflow.com/questions/52885926/what-is-the-difference-between-flink-join-and-connect
On 02.04.20 12:11, Manas Kale wrote:
Also
* What happens to watermarks after a union operation? Do I have to
assignTimestampsAndWatermarks() again? I guess I will have to since
multiple streams are being combined and Flink needs to know how to
resolve individual watermarks.
* What is the difference between union() and connect()?
On Thu, Apr 2, 2020 at 10:33 AM Manas Kale <manaskal...@gmail.com
<mailto:manaskal...@gmail.com>> wrote:
Hi,
I want to perform some processing on events only when the watermark
is updated. Otherwise, for all other events, I want to keep
buffering them till the watermark arrives.
The main motivation behind doing this is that I have several
operators that emit events/messages to a downstream operator. Since
the order in which events arrive at the downstream operator is not
guaranteed to be in chronological event time, I want to manually
sort events when the watermark arrives and only then proceed.
Specifically, I want to first combine multiple streams and then do
the above. Something like :
stream1.union(stream2, steream3)...
One solution I am exploring is using a global window with a trigger
that will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)
I will store the latest watermark in the trigger's state store. In
the onElement() method, I will FIRE if the current watermark is
different than the stored one.
Is this the best way to implement the functionality described above?