[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
nyingping updated FLINK-28504: ------------------------------ Description: I have a window topN test task, the code is as follows ` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); ` Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. `streamExecutionEnvironment.setParallelism(1);` !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. ` st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); ` result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. was: I have a window topN test task, the code is as follows ``` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); ``` Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. `streamExecutionEnvironment.setParallelism(1);` !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. ` st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); ` result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.2 > Environment: flink 1.14.1 > kafka 2.4 > Reporter: nyingping > Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > ` > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); > ` > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > `streamExecutionEnvironment.setParallelism(1);` > !image-2022-07-12-15-19-29-950.png! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > ` > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); > ` > result: > !image-2022-07-12-15-20-06-919.png! > > As mentioned above, I hope to be able to aggregate in two stages and align > the watermarks at the same time. -- This message was sent by Atlassian Jira (v8.20.10#820010)