Hello Till,
thanks for your reply!
I have been able to debug the issue and reported it in 
https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' 
digests, leading the HepPlanner to consider the aggregations to be equivalent, 
when they are not because they use different time windows. I explained the 
issue more in details in the ticket above, and have submitted a PR earlier 
today: https://github.com/apache/flink/pull/10854.
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/flink/pull/10854>
[FLINK-15577][table-planner] Fix similar aggregations with different windows 
being considered the same by BenoitHanotte * Pull Request #10854 * 
apache/flink<https://github.com/apache/flink/pull/10854>
What is the purpose of the change The RelNode&#39;s digest is used by the 
Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an 
equivalent vertex was already present in the grap...
github.com
Best,
Benoit
________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: Tuesday, January 14, 2020 7:13 PM
To: Benoit Hanotte <b.hano...@criteo.com>
Cc: user@flink.apache.org <user@flink.apache.org>; Jingsong Li 
<jingsongl...@gmail.com>; twal...@apache.org <twal...@apache.org>
Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, 
giving incorrect SQL query results

Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL 
component I've pulled in Timo and Jingsong who know much better what could be 
wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte 
<b.hano...@criteo.com<mailto:b.hano...@criteo.com>> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner 
optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case 
(it doesn't seem to happen in trivial cases):

  1.  We open 2 event streams ("clicks" and "displays")
  2.  We compute the click rate (ctr) over 2 hours and 6 hours sliding windows.
  3.  We then union to output one row per hour with the max value between the 
values computed over 2 and 6hrs.

You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original 
logical plan is valid: the top-level UNION is between two LogicalProjects, for 
the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION 
now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: 
https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FBenoitHanotte%2Fflink%2Fcommit%2F3d388f153b44bb35b57b8400407ff24a2e91661f&data=02%7C01%7Cb.hanotte%40criteo.com%7C8d67980aeae743d0f35108d7991d781d%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637146224161863515&sdata=IVYMkxnS8HKu6rBQ5WAAFarkz37PMyqOs6WlNA4vyDg%3D&reserved=0>,
 unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what 
could trigger this issue? I looked at the different rules applied by the 
planner [4], but, as I am not familiar with them, I haven't yet been able to 
find the root cause.

Thanks a lot for your help!

Benoit Hanotte

********************************* [1] SQL query 
*********************************

    WITH displays AS (
        SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM 
my_catalog.my_db.display
    ),

    clicks AS (
        SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM 
my_catalog.my_db.click
    ),

    counts_2h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
    ),

    counts_6h AS (
        SELECT
            SUM(nb_clicks) / SUM(nb_displays) as ctr,
            HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as 
`timestamp`
        FROM (
            (SELECT * FROM displays)
            UNION ALL
            (SELECT * FROM clicks)
        ) t
        GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
    )

    SELECT
        TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
        MAX(ctr)
    FROM (
        (SELECT * FROM counts_6h)
        UNION ALL
        (SELECT * FROM counts_2h)
    ) t
    GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


********************* [2] Logical plan (before optimization) 
***********************

    LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
      LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
        LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
          LogicalUnion(all=[true])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 
21600000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[0], 
nb_clicks=[1])
                      LogicalTableScan(table=[[my_catalog, my_db, click]])
            LogicalProject(ctr=[$0], timestamp=[$1])
              LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
                LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[SUM($2)])
                  LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 
7200000:INTERVAL HOUR)], nb_clicks=[$2], nb_displays=[$1])
                    LogicalProject(timestamp=[$0], nb_displays=[1], 
nb_clicks=[0])
                      LogicalTableScan(table=[[my_catalog, my_db, display]])


****************** [3] Resulting physical plan (after optimization) 
********************

    DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]): 
rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io}, id = 
556
      DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$, 
'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu, 
4800.0 io}, id = 555
        DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount = 
400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS 
timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 
2400.0 io}, id = 548
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, 
SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) 
AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost 
= {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, 
nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 
io}, id = 546
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 544
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 545
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
          DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime AS 
timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0 cpu, 
2400.0 io}, id = 553
            DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
'timestamp, 7200000.millis, 3600000.millis)], select=[SUM(nb_clicks) AS $f0, 
SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) 
AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0, cumulative cost 
= {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
              DataStreamUnion(all=[true], union all=[timestamp, nb_displays, 
nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows, 600.0 cpu, 2400.0 
io}, id = 551
                DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 549
                  StreamTableSourceScan(table=[[my_catalog, my_db, display]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
                DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS 
nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu, 1200.0 
io}, id = 550
                  StreamTableSourceScan(table=[[my_catalog, my_db, click]], 
fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0, cumulative cost = 
{100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply via email to