Yes, Chengzhi. That’s exactly what I mean. But you should be careful with the semantics of your pipeline. The problem cannot be gracefully solved if there’s a natural time offset between the two streams.
Best, Xingcan > On 14 Apr 2018, at 4:00 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote: > > Hi Xingcan, > > Thanks for your quick response and now I understand it better. To clarify, do > you mean try to add a static time when I override extractTimestamp function? > > For example, > > override def extractTimestamp(element: MyEvent, previousElementTimestamp: > Long): Long = { > val timestamp = element.getCreationTime() + 3600000L //1 hour delay > currentMaxTimestamp = max(timestamp, currentMaxTimestamp) > timestamp > } > > Appreciate your help! > > Best, > Chengzhi > > > On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <xingc...@gmail.com > <mailto:xingc...@gmail.com>> wrote: > Hi Chengzhi, > > currently, the watermarks of the two streams of a connected stream are > forcibly synchronized, i.e., the watermark is decided by the stream with a > larger delay. Thus the window trigger is also affected by this mechanism. > > As a workaround, you could try to add (or subtract) a static time offset to > one of your streams, which can make them more “close” to each other. > > Best, > Xingcan > > >> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w.zhaocheng...@gmail.com >> <mailto:w.zhaocheng...@gmail.com>> wrote: >> >> Hi, flink community, >> >> I had an issue with slow watermark advances and needs some help here. So >> here is what happened: I have two streams -- A and B, and they perform >> co-process to join together and A has another steam as output. >> >> A --> Output >> B --> (Connect A) --> Output >> >> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 >> hours delay. The low watermark of A and output sink is within 2 hours >> window, however, the co-process end up with 10 hours low watermark late. >> >> My setup is I am using file system as source, so every 15 mins there will be >> files been drop to a directory and flink pick them up from there. >> >> Please advise and appreciate it in advance! >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks> >> >> Best, >> Chengzhi >> > >