[ 
https://issues.apache.org/jira/browse/FLINK-13492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16898592#comment-16898592
 ] 

Simon Su commented on FLINK-13492:
----------------------------------

Yes, I have marked it as duplicated.

> BoundedOutOfOrderTimestamps cause Watermark's timestamp leak
> ------------------------------------------------------------
>
>                 Key: FLINK-13492
>                 URL: https://issues.apache.org/jira/browse/FLINK-13492
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.9.0
>            Reporter: Simon Su
>            Priority: Major
>         Attachments: Watermark_timestamp_leak.diff
>
>
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, conf);
> // Use eventtime, default autoWatermarkInterval is 200ms
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> Kafka kafka = new Kafka() 
> .version("0.11") 
> .topic(topic) 
> .startFromLatest() 
> .properties(properties);
> Schema schema = new Schema();
> for (int i = 0; i < names.length; i++) { 
>     if ("timestamp".equalsIgnoreCase(names[i])) {
>         // set latency to 1000ms
>         schema.field("rowtime", types[i]).rowtime(new     
> Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(1000)); 
> }    
>     else { 
>         schema.field(names[i], types[i]); 
> }
> /** ..... */
> tableEnv 
> .connect(kafka) 
> .withFormat(new Protobuf().protobufName("order_sink")) 
> .withSchema(schema) 
> .inAppendMode() 
> .registerTableSource("orderStream");{code}
> Register up stream table, then use a 10s Tumble window on this table, we 
> input a sequence of normal data, but there is not result output.
> Then we start to debug to see if the watermark is normally emitted, finally 
> we found the issue.
>  # maxTimestamp will be initialized in BoundedOutOfOrderTimestamps to 
> Long.MIN_VALUE.
>  # nextTimestamp method will extract timestamp from source and set to 
> maxTimestamp.
>  # getWatermark() method will calculate the watermark's timestamp based on 
> maxTimestamp and delay.
> When  +{color:#205081}TimestampsAndPeriodicWatermarksOperator{color}+ 
> {color:#333333}initialize and call open method, it will start to register a 
> SystemTimeService to generate watermark based on watermarkInterval, so that's 
> the problem, the thread initialize and call 
> BoundedOutOfOrderTimestamps${color}getCurrentWatermark, it will cause a Long 
> Value leak. {color:#d04437}(Long.MIN_VALUE - delay). which cause all of the 
> watermark will be dropped because apparently there are less then ( 
> Long.MIN_VALUE - delay ).
> {color}
> {color:#d04437}A workaround is to set a large autoWatermarkInterval to make 
> SystemTimeService Thread a long start delay.{color}
>  
> {code:java}
> public void onProcessingTime(long timestamp) throws Exception {
> ...
> getProcessingTimeService().registerTimer(now + watermarkInterval, this);
> ...
> }
> {code}
>  
>  
> {code:java}
> public ScheduledFuture<?> registerTimer(long timestamp, 
> ProcessingTimeCallback target) {
> ...
> long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
> ...
> }
> {code}
>  
> {color:#d04437} {color}
> {color:#d04437}Actually, I think we can fix it by add the delay in 
> BoundedOutOfOrderTimestamps's constructor which can avoid the calculation 
> leak ...{color}
> {color:#d04437} {color}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to