[ https://issues.apache.org/jira/browse/FLINK-13492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16898592#comment-16898592 ]
Simon Su commented on FLINK-13492: ---------------------------------- Yes, I have marked it as duplicated. > BoundedOutOfOrderTimestamps cause Watermark's timestamp leak > ------------------------------------------------------------ > > Key: FLINK-13492 > URL: https://issues.apache.org/jira/browse/FLINK-13492 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.9.0 > Reporter: Simon Su > Priority: Major > Attachments: Watermark_timestamp_leak.diff > > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(1, conf); > // Use eventtime, default autoWatermarkInterval is 200ms > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Kafka kafka = new Kafka() > .version("0.11") > .topic(topic) > .startFromLatest() > .properties(properties); > Schema schema = new Schema(); > for (int i = 0; i < names.length; i++) { > if ("timestamp".equalsIgnoreCase(names[i])) { > // set latency to 1000ms > schema.field("rowtime", types[i]).rowtime(new > Rowtime().timestampsFromField("timestamp").watermarksPeriodicBounded(1000)); > } > else { > schema.field(names[i], types[i]); > } > /** ..... */ > tableEnv > .connect(kafka) > .withFormat(new Protobuf().protobufName("order_sink")) > .withSchema(schema) > .inAppendMode() > .registerTableSource("orderStream");{code} > Register up stream table, then use a 10s Tumble window on this table, we > input a sequence of normal data, but there is not result output. > Then we start to debug to see if the watermark is normally emitted, finally > we found the issue. > # maxTimestamp will be initialized in BoundedOutOfOrderTimestamps to > Long.MIN_VALUE. > # nextTimestamp method will extract timestamp from source and set to > maxTimestamp. > # getWatermark() method will calculate the watermark's timestamp based on > maxTimestamp and delay. > When +{color:#205081}TimestampsAndPeriodicWatermarksOperator{color}+ > {color:#333333}initialize and call open method, it will start to register a > SystemTimeService to generate watermark based on watermarkInterval, so that's > the problem, the thread initialize and call > BoundedOutOfOrderTimestamps${color}getCurrentWatermark, it will cause a Long > Value leak. {color:#d04437}(Long.MIN_VALUE - delay). which cause all of the > watermark will be dropped because apparently there are less then ( > Long.MIN_VALUE - delay ). > {color} > {color:#d04437}A workaround is to set a large autoWatermarkInterval to make > SystemTimeService Thread a long start delay.{color} > > {code:java} > public void onProcessingTime(long timestamp) throws Exception { > ... > getProcessingTimeService().registerTimer(now + watermarkInterval, this); > ... > } > {code} > > > {code:java} > public ScheduledFuture<?> registerTimer(long timestamp, > ProcessingTimeCallback target) { > ... > long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; > ... > } > {code} > > {color:#d04437} {color} > {color:#d04437}Actually, I think we can fix it by add the delay in > BoundedOutOfOrderTimestamps's constructor which can avoid the calculation > leak ...{color} > {color:#d04437} {color} > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)