[ https://issues.apache.org/jira/browse/FLINK-20289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-20289: ---------------------------- Description: In FLINK-19878, we improved that the ChangelogNormalize is applied after WatermarkAssigner to make the watermark to be close to the source. This helps the watermark to be more fine-grained. However, in some cases, this may shuffle more data, because we may apply all computed column expressions before ChangelogNormalize. As follows, {{a+1}} can be applied after ChangelogNormalize to reduce the shuffles. {code:sql} CREATE TABLE src ( id STRING, a INT, b AS a + 1, c STRING, ts as to_timestamp(c), PRIMARY KEY (id) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector' = 'values', 'changelog-mode' = 'UA,D' ); SELECT a, b, c FROM src WHERE a > 1 {code} {code} Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D]) +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) {code} A better plan should be: {code} Calc(select=[a, +(a, 1) AS b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D]) +- Calc(select=[id, a, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) {code} was: In FLINK-19878, we improved that the ChangelogNormalize is applied after WatermarkAssigner to make the watermark to be close to the source. This helps the watermark to be more fine-grained. However, in some cases, this may shuffle more data, because we may apply all computed column expressions before ChangelogNormalize. As follows, {{a+1}} can be applied after ChangelogNormalize to reduce the shuffles. {code:sql} CREATE TABLE src ( id STRING, a INT, b AS a + 1, c STRING, ts as to_timestamp(c), PRIMARY KEY (id) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector' = 'values', 'changelog-mode' = 'UA,D' ); SELECT a, b, c FROM src WHERE a > 1 {code} {code} Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D]) +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) {code} > Computed columns can be calculated after ChangelogNormalize to reduce shuffle > ----------------------------------------------------------------------------- > > Key: FLINK-20289 > URL: https://issues.apache.org/jira/browse/FLINK-20289 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: Jark Wu > Priority: Major > > In FLINK-19878, we improved that the ChangelogNormalize is applied after > WatermarkAssigner to make the watermark to be close to the source. This helps > the watermark to be more fine-grained. > However, in some cases, this may shuffle more data, because we may apply all > computed column expressions before ChangelogNormalize. As follows, {{a+1}} > can be applied after ChangelogNormalize to reduce the shuffles. > {code:sql} > CREATE TABLE src ( > id STRING, > a INT, > b AS a + 1, > c STRING, > ts as to_timestamp(c), > PRIMARY KEY (id) NOT ENFORCED, > WATERMARK FOR ts AS ts - INTERVAL '1' SECOND > ) WITH ( > 'connector' = 'values', > 'changelog-mode' = 'UA,D' > ); > SELECT a, b, c FROM src WHERE a > 1 > {code} > {code} > Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) > +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL > SECOND)], changelogMode=[UA,D]) > +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, default_database, > src]], fields=[id, a, c], changelogMode=[UA,D]) > {code} > A better plan should be: > {code} > Calc(select=[a, +(a, 1) AS b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) > +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL > SECOND)], changelogMode=[UA,D]) > +- Calc(select=[id, a, c, TO_TIMESTAMP(c) AS ts], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, default_database, > src]], fields=[id, a, c], changelogMode=[UA,D]) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)