[ https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Baozhu Zhao updated FLINK-35885: -------------------------------- Description: We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark, The SQL is as follows {code:sql} CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0; {code} For detailed test code, please refer to https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java The root cause is that org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.java#processWatermark() supports advance progress by watermark. When the watermark suddenly exceeds the next window end timestamp, a result of count 0 will appear. {code:java} public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() > currentWatermark) { windowProcessor.advanceProgress(mark.getTimestamp()); super.processWatermark(mark); } else { super.processWatermark(new Watermark(currentWatermark)); } } {code} was: We have discovered an unexpected case where abnormal data with a count of 0 occurs when performing proctime window aggregation on data with a watermark, The SQL is as follows {code:sql} CREATE TABLE s1 ( id INT, event_time TIMESTAMP(3), name string, proc_time AS PROCTIME (), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ('connector' = 'my-source') ; SELECT * FROM ( SELECT name, COUNT(id) AS total_count, window_start, window_end FROM TABLE ( TUMBLE ( TABLE s1, DESCRIPTOR (proc_time), INTERVAL '30' SECONDS ) ) GROUP BY window_start, window_end, name ) WHERE total_count = 0; {code} For detailed test code, please refer to https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java > proctime aggregate window triggered by watermark > ------------------------------------------------ > > Key: FLINK-35885 > URL: https://issues.apache.org/jira/browse/FLINK-35885 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.13.6, 1.17.2 > Environment: flink 1.13.6 with blink or flink 1.17.2 > Reporter: Baozhu Zhao > Priority: Major > > We have discovered an unexpected case where abnormal data with a count of 0 > occurs when performing proctime window aggregation on data with a watermark, > The SQL is as follows > {code:sql} > CREATE TABLE s1 ( > id INT, > event_time TIMESTAMP(3), > name string, > proc_time AS PROCTIME (), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) > WITH > ('connector' = 'my-source') > ; > SELECT > * > FROM > ( > SELECT > name, > COUNT(id) AS total_count, > window_start, > window_end > FROM > TABLE ( > TUMBLE ( > TABLE s1, > DESCRIPTOR (proc_time), > INTERVAL '30' SECONDS > ) > ) > GROUP BY > window_start, > window_end, > name > ) > WHERE > total_count = 0; > {code} > For detailed test code, please refer to > https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java > The root cause is that > org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.java#processWatermark() > supports advance progress by watermark. When the watermark suddenly exceeds > the next window end timestamp, a result of count 0 will appear. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (mark.getTimestamp() > currentWatermark) { > windowProcessor.advanceProgress(mark.getTimestamp()); > super.processWatermark(mark); > } else { > super.processWatermark(new Watermark(currentWatermark)); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)