lincoln lee created FLINK-32501: ----------------------------------- Summary: Wrong execution plan of a proctime window aggregation generated due to incorrect cost evaluation Key: FLINK-32501 URL: https://issues.apache.org/jira/browse/FLINK-32501 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.1, 1.16.2 Reporter: lincoln lee Assignee: lincoln lee Fix For: 1.18.0, 1.17.2
Currently when uses window aggregation referring a windowing tvf with a filter condition, may encounter wrong plan which may hang forever in runtime(the window aggregate operator never output) for such a case: {code} insert into sink select window_start, window_end, b, COALESCE(sum(case when a = 11 then 1 end), 0) c from TABLE( TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS) ) where a in (1, 5, 7, 9, 11) GROUP BY window_start, window_end, b {code} generate wrong plan which didn't combine the proctime WindowTableFunction into WindowAggregate (so when translate to execution plan the WindowAggregate will wrongly recognize the window as an event-time window, then the WindowAggregateOperator will not receive watermark nor setup timers to fire any windows in runtime) {code} Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) +- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[window_start, window_end, b, CASE((a = 11), 1, null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[10 s])]) +- Calc(select=[a, b, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) {code} expected plan: {code} Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c]) +- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)