Yes, Chengzhi. That’s exactly what I mean. But you should be careful with the 
semantics of your pipeline. The problem cannot be gracefully solved if there’s 
a natural time offset between the two streams.

Best, Xingcan

> On 14 Apr 2018, at 4:00 AM, Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote:
> 
> Hi Xingcan,
> 
> Thanks for your quick response and now I understand it better. To clarify, do 
> you mean try to add a static time when I override extractTimestamp function?
> 
> For example, 
> 
> override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
> Long): Long = {
>     val timestamp = element.getCreationTime() + 3600000L //1 hour delay
>     currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
>     timestamp 
> }
> 
> Appreciate your help!
> 
> Best,
> Chengzhi
> 
> 
> On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Chengzhi,
> 
> currently, the watermarks of the two streams of a connected stream are 
> forcibly synchronized, i.e., the watermark is decided by the stream with a 
> larger delay. Thus the window trigger is also affected by this mechanism. 
> 
> As a workaround, you could try to add (or subtract) a static time offset to 
> one of your streams, which can make them more “close” to each other.
> 
> Best,
> Xingcan
> 
> 
>> On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <w.zhaocheng...@gmail.com 
>> <mailto:w.zhaocheng...@gmail.com>> wrote:
>> 
>> Hi, flink community,
>> 
>> I had an issue with slow watermark advances and needs some help here. So 
>> here is what happened: I have two streams -- A and B, and they perform 
>> co-process to join together and A has another steam as output. 
>> 
>> A --> Output
>> B --> (Connect A) --> Output
>> 
>> I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 
>> hours delay. The low watermark of A and output sink is within 2 hours 
>> window, however, the co-process end up with 10 hours low watermark late.
>> 
>> My setup is I am using file system as source, so every 15 mins there will be 
>> files been drop to a directory and flink pick them up from there. 
>> 
>> Please advise and appreciate it in advance!
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
>> 
>> Best,
>> Chengzhi
>> 
> 
> 

Reply via email to