Hello,

We seem to be facing an issue with Flink where the physical plan after planner 
optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case 
(it doesn't seem to happen in trivial cases):

  1.  We open 2 event streams ("clicks" and "displays")
  2.  We compute the click rate (ctr) over 2 hours and 6 hours sliding windows.
  3.  We then union to output one row per hour with the max value between the 
values computed over 2 and 6hrs.

You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original 
logical plan is valid: the top-level UNION is between two LogicalProjects, for 
the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION 
now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: 
https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f,
 unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what 
could trigger this issue? I looked at the different rules applied by the 
planner [4], but, as I am not familiar with them, I haven't yet been able to 
find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query 
*********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM 
my_catalog.my_db.display
    ),

    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM 
my_catalog.my_db.click
    ),

    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),

    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as 
`timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )

    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) 
***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 
21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], 
nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 
7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], 
nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) 
********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): 
rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 
556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 
'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 
4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 
400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS 
timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 
2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, 
SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) 
AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost 
= {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, 
nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 
io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS 
timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 
2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, 
SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) 
AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost 
= {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, 
nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 
io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply via email to