lincoln lee created FLINK-32501:
-----------------------------------
Summary: Wrong execution plan of a proctime window aggregation
generated due to incorrect cost evaluation
Key: FLINK-32501
URL: https://issues.apache.org/jira/browse/FLINK-32501
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2
Reporter: lincoln lee
Assignee: lincoln lee
Fix For: 1.18.0, 1.17.2
Currently when uses window aggregation referring a windowing tvf with a filter
condition, may encounter wrong plan which may hang forever in runtime(the
window aggregate operator never output)
for such a case:
{code}
insert into sink
select
window_start,
window_end,
b,
COALESCE(sum(case
when a = 11
then 1
end), 0) c
from
TABLE(
TUMBLE(TABLE source, DESCRIPTOR(proctime), INTERVAL '10' SECONDS)
)
where
a in (1, 5, 7, 9, 11)
GROUP BY
window_start, window_end, b
{code}
generate wrong plan which didn't combine the proctime WindowTableFunction into
WindowAggregate (so when translate to execution plan the WindowAggregate will
wrongly recognize the window as an event-time window, then the
WindowAggregateOperator will not receive watermark nor setup timers to fire any
windows in runtime)
{code}
Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS
TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
+- WindowAggregate(groupBy=[b], window=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[10 s])], select=[b, SUM($f3) AS $f1, start('w$) AS
window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[window_start, window_end, b, CASE((a = 11), 1,
null:INTEGER) AS $f3], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
+- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[10
s])])
+- Calc(select=[a, b, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database,
source, project=[a, b], metadata=[]]], fields=[a, b])
{code}
expected plan:
{code}
Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c])
+- Calc(select=[CAST(window_start AS TIMESTAMP(6)) AS ws, CAST(window_end AS
TIMESTAMP(6)) AS we, b, CAST(COALESCE($f1, 0) AS BIGINT) AS c])
+- WindowAggregate(groupBy=[b], window=[TUMBLE(time_col=[proctime], size=[10
s])], select=[b, SUM($f3) AS $f1, start('w$) AS window_start, end('w$) AS
window_end])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME()
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
+- TableSourceScan(table=[[default_catalog, default_database,
source, project=[a, b], metadata=[]]], fields=[a, b])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)