[ 
https://issues.apache.org/jira/browse/FLINK-20289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-20289:
----------------------------
    Description: 
In FLINK-19878, we improved that the ChangelogNormalize is applied after 
WatermarkAssigner to make the watermark to be close to the source. This helps 
the watermark to be more fine-grained.

However, in some cases, this may shuffle more data, because we may apply all 
computed column expressions before ChangelogNormalize. As follows, {{a+1}} can 
be applied after ChangelogNormalize to reduce the shuffles. 

{code:sql}
CREATE TABLE src (
  id STRING,
  a INT,
  b AS a + 1,
  c STRING,
  ts as to_timestamp(c),
  PRIMARY KEY (id) NOT ENFORCED,
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
  'connector' = 'values',
  'changelog-mode' = 'UA,D'
);

SELECT a, b, c FROM src WHERE a > 1
{code}


{code}
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
      +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[UA,D])
         +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[UA,D])
            +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[UA,D])
{code}

A better plan should be:

{code}
Calc(select=[a, +(a, 1) AS b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
      +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[UA,D])
         +- Calc(select=[id, a, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
            +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[UA,D])
{code}


  was:
In FLINK-19878, we improved that the ChangelogNormalize is applied after 
WatermarkAssigner to make the watermark to be close to the source. This helps 
the watermark to be more fine-grained.

However, in some cases, this may shuffle more data, because we may apply all 
computed column expressions before ChangelogNormalize. As follows, {{a+1}} can 
be applied after ChangelogNormalize to reduce the shuffles. 

{code:sql}
CREATE TABLE src (
  id STRING,
  a INT,
  b AS a + 1,
  c STRING,
  ts as to_timestamp(c),
  PRIMARY KEY (id) NOT ENFORCED,
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
  'connector' = 'values',
  'changelog-mode' = 'UA,D'
);

SELECT a, b, c FROM src WHERE a > 1
{code}


{code}
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
      +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[UA,D])
         +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[UA,D])
            +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[UA,D])
{code}



> Computed columns can be calculated after ChangelogNormalize to reduce shuffle
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-20289
>                 URL: https://issues.apache.org/jira/browse/FLINK-20289
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Jark Wu
>            Priority: Major
>
> In FLINK-19878, we improved that the ChangelogNormalize is applied after 
> WatermarkAssigner to make the watermark to be close to the source. This helps 
> the watermark to be more fine-grained.
> However, in some cases, this may shuffle more data, because we may apply all 
> computed column expressions before ChangelogNormalize. As follows, {{a+1}} 
> can be applied after ChangelogNormalize to reduce the shuffles. 
> {code:sql}
> CREATE TABLE src (
>   id STRING,
>   a INT,
>   b AS a + 1,
>   c STRING,
>   ts as to_timestamp(c),
>   PRIMARY KEY (id) NOT ENFORCED,
>   WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
> ) WITH (
>   'connector' = 'values',
>   'changelog-mode' = 'UA,D'
> );
> SELECT a, b, c FROM src WHERE a > 1
> {code}
> {code}
> Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
> +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
>    +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>       +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
> SECOND)], changelogMode=[UA,D])
>          +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
> changelogMode=[UA,D])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> src]], fields=[id, a, c], changelogMode=[UA,D])
> {code}
> A better plan should be:
> {code}
> Calc(select=[a, +(a, 1) AS b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
> +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
>    +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>       +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
> SECOND)], changelogMode=[UA,D])
>          +- Calc(select=[id, a, c, TO_TIMESTAMP(c) AS ts], 
> changelogMode=[UA,D])
>             +- TableSourceScan(table=[[default_catalog, default_database, 
> src]], fields=[id, a, c], changelogMode=[UA,D])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to