Hi Chengzhi, Functions in Flink are implemented in a way to preserve the timestamps of elements or assign timestamps which are aligned with the existing watermarks. For example, the result of a time window aggregation has the end timestamp of the window as a timestamp and records emitted by the onTimer() method have the timestamp of the timer as a record timestamp. So unless you fiddle with internal APIs to reset the record timestamps of elements, you don't need to worry about generating new watermarks.
Best, Fabian 2018-04-25 20:20 GMT+02:00 Chengzhi Zhao <w.zhaocheng...@gmail.com>: > Hi, everyone, > > I am trying to do some join-like pipeline using flink connect operator and > CoProcessFunction, I have use case that I need to connect 3+ streams. So I > am having something like this: > > A > ===> C > B ==> E > D > > So two streams A and B connect at first with 3 hours late on low > watermark, after data has been emitted (the output C stream), a new stream > D connect to C and emitted E as final output. I was wondering how the > downstream watermark should be defined. Should I give C stream a new > watermark for 3 hours delay again? or when I connect stream D, everything > will be 6 hours late on low watermark. > > I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3 > hours > > Thanks for your tips and help in advance. > > Best, > Chengzhi > > [1]https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_ > timestamps_watermarks.html#with-periodic-watermarks >