[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
nyingping updated FLINK-28504: ------------------------------ Environment: flink 1.14 kafka 2.4 was: flink 1.14.1 kafka 2.4 > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.2 > Environment: flink 1.14 > kafka 2.4 > Reporter: nyingping > Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)