nyingping created FLINK-28504: --------------------------------- Summary: Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid Key: FLINK-28504 URL: https://issues.apache.org/jira/browse/FLINK-28504 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.14.2 Environment: flink 1.14.1
kafka 2.4 Reporter: nyingping Attachments: image-2022-07-12-15-11-51-653.png, image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png I have a window topN test task, the code is as follows ``` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); ``` Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. `streamExecutionEnvironment.setParallelism(1);` !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. ` st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); ` result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. -- This message was sent by Atlassian Jira (v8.20.10#820010)