Hello, We seem to be facing an issue with Flink where the physical plan after planner optimization is not correct. I have been able to reproduce the issue in the following "simplified" use case (it doesn't seem to happen in trivial cases):
1. We open 2 event streams ("clicks" and "displays") 2. We compute the click rate (ctr) over 2 hours and 6 hours sliding windows. 3. We then union to output one row per hour with the max value between the values computed over 2 and 6hrs. You can find SQL query below [1]. After activating the debug logging for calcite, I can see that the original logical plan is valid: the top-level UNION is between two LogicalProjects, for the 2hr and 6hrs HOP windows [2]. However, in the final Physical plan, we can see that both sides of the UNION now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3]. I pushed a commit to my fork to reproduce the issue: https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, unfortunately simplifying the query seems to make the issue disappear. Is there anything obvious I am missing, or do you have any pointer of what could trigger this issue? I looked at the different rules applied by the planner [4], but, as I am not familiar with them, I haven't yet been able to find the root cause. Thanks a lot for your help! Benoit Hanotte ********************************* [1] SQL query ********************************* WITH displays AS ( SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM my_catalog.my_db.display ), clicks AS ( SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM my_catalog.my_db.click ), counts_2h AS ( SELECT SUM(nb_clicks) / SUM(nb_displays) as ctr, HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp` FROM ( (SELECT * FROM displays) UNION ALL (SELECT * FROM clicks) ) t GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) ), counts_6h AS ( SELECT SUM(nb_clicks) / SUM(nb_displays) as ctr, HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as `timestamp` FROM ( (SELECT * FROM displays) UNION ALL (SELECT * FROM clicks) ) t GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) ) SELECT TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`, MAX(ctr) FROM ( (SELECT * FROM counts_6h) UNION ALL (SELECT * FROM counts_2h) ) t GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR) ********************* [2] Logical plan (before optimization) *********************** LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1]) LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)]) LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0]) LogicalUnion(all=[true]) LogicalProject(ctr=[$0], timestamp=[$1]) LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)]) LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)]) LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1]) LogicalProject(timestamp=[$0], nb_displays=[0], nb_clicks=[1]) LogicalTableScan(table=[[my_catalog, my_db, click]]) LogicalProject(ctr=[$0], timestamp=[$1]) LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)]) LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)]) LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1]) LogicalProject(timestamp=[$0], nb_displays=[1], nb_clicks=[0]) LogicalTableScan(table=[[my_catalog, my_db, display]]) ****************** [3] Resulting physical plan (after optimization) ******************** DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 556 DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 4800.0 io}, id = 555 DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554 DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 548 DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547 DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 546 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 544 StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 545 StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542 DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 2400.0 io}, id = 553 DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552 DataStreamUnion(all=[true], union all=[timestamp, nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 io}, id = 551 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 549 StreamTableSourceScan(table=[[my_catalog, my_db, display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 io}, id = 550 StreamTableSourceScan(table=[[my_catalog, my_db, click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542