Hi Benoit, thanks for reporting this issue. Since I'm not too familiar with the SQL component I've pulled in Timo and Jingsong who know much better what could be wrong than I do.
Cheers, Till On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <b.hano...@criteo.com> wrote: > Hello, > > We seem to be facing an issue with Flink where the physical plan after > planner optimization is not correct. > I have been able to reproduce the issue in the following "simplified" use > case (it doesn't seem to happen in trivial cases): > > 1. We open 2 event streams ("clicks" and "displays") > 2. We compute the click rate (ctr) over 2 hours and 6 hours sliding > windows. > 3. We then union to output one row per hour with the max value between > the values computed over 2 and 6hrs. > > You can find SQL query below [1]. > After activating the debug logging for calcite, I can see that the > original logical plan is valid: the top-level UNION is between two > LogicalProjects, for the 2hr and 6hrs HOP windows [2]. > However, in the final Physical plan, we can see that both sides of the > UNION now have 6hrs HOP windows instead of one window over 2hr and one over > 6hr [3]. > > I pushed a commit to my fork to reproduce the issue: > https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f, > unfortunately simplifying the query seems to make the issue disappear. > > Is there anything obvious I am missing, or do you have any pointer of what > could trigger this issue? I looked at the different rules applied by the > planner [4], but, as I am not familiar with them, I haven't yet been able > to find the root cause. > > Thanks a lot for your help! > > Benoit Hanotte > > ********************************* [1] SQL query > ********************************* > > WITH displays AS ( > SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM > my_catalog.my_db.display > ), > > clicks AS ( > SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM > my_catalog.my_db.click > ), > > counts_2h AS ( > SELECT > SUM(nb_clicks) / SUM(nb_displays) as ctr, > HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > as `timestamp` > FROM ( > (SELECT * FROM displays) > UNION ALL > (SELECT * FROM clicks) > ) t > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,* INTERVAL '2' HOUR)* > ), > > counts_6h AS ( > SELECT > SUM(nb_clicks) / SUM(nb_displays) as ctr, > HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) > as `timestamp` > FROM ( > (SELECT * FROM displays) > UNION ALL > (SELECT * FROM clicks) > ) t > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, *INTERVAL '6' HOUR*) > ) > > SELECT > TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`, > MAX(ctr) > FROM ( > (SELECT * FROM counts_6h) > UNION ALL > (SELECT * FROM counts_2h) > ) t > GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR) > > > ********************* [2] Logical plan (before optimization) > *********************** > > LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1]) > LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)]) > LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0]) > LogicalUnion(all=[true]) > LogicalProject(ctr=[$0], timestamp=[$1]) > LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)]) > LogicalAggregate(group=[{0}], agg#0=[SUM($1)], > agg#1=[SUM($2)]) > LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR, > 21600000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1]) > LogicalProject(timestamp=[$0], nb_displays=[0], > nb_clicks=[1]) > LogicalTableScan(table=[[my_catalog, my_db, click]]) > LogicalProject(ctr=[$0], timestamp=[$1]) > LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)]) > LogicalAggregate(group=[{0}], agg#0=[SUM($1)], > agg#1=[SUM($2)]) > LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR, > 7200000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1]) > LogicalProject(timestamp=[$0], nb_displays=[1], > nb_clicks=[0]) > LogicalTableScan(table=[[my_catalog, my_db, > display]]) > > > ****************** [3] Resulting physical plan (after optimization) > ******************** > > DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): > rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, > id = 556 > DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, > 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS > w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS > w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, > 4800.0 io}, id = 555 > DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount > = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554 > DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime > AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 > cpu, 2400.0 io}, id = 548 > DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$, > 'timestamp,** 7200000.millis, 3600000.millis**)]*, select=[SUM(nb_clicks) > AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, > rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, > cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547 > DataStreamUnion(all=[true], union all=[timestamp, > nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, > 600.0 cpu, 2400.0 io}, id = 546 > DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS > nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, > 1200.0 io}, id = 544 > StreamTableSourceScan(table=[[my_catalog, my_db, > display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, > cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543 > DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS > nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, > 1200.0 io}, id = 545 > StreamTableSourceScan(table=[[my_catalog, my_db, > click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, > cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542 > DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime > AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 > cpu, 2400.0 io}, id = 553 > DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$, > 'timestamp, 7200000.millis, 3600000.millis)*], select=[SUM(nb_clicks) AS > $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, > rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, > cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552 > DataStreamUnion(all=[true], union all=[timestamp, > nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, > 600.0 cpu, 2400.0 io}, id = 551 > DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS > nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, > 1200.0 io}, id = 549 > StreamTableSourceScan(table=[[my_catalog, my_db, > display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, > cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543 > DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS > nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, > 1200.0 io}, id = 550 > StreamTableSourceScan(table=[[my_catalog, my_db, > click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, > cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542 > >