[ https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694372#comment-17694372 ]
Aitozi commented on FLINK-31205: -------------------------------- After some research, I found that there are better choices than using a union to get a single tree. {{Union}} can only cover the use case of multi-sink to the same table because the {{Union}} enforces the type consistency. We can add a new "virtual" RelNode, accepting the multi-sink as input. It can work as packing the multi-tree together so that, from the perspective of the optimizer, it can have the ability to do global optimization. In my POC, I add a new type RelNode named {{MultiSink}} before passing it to the calcite optimizer. The MultiSink does not do any transformation on the inputs. After logical optimization, the plan is {code:java} LogicalMultiSink :- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, b]) : +- LogicalProject(inputs=[0..1]) : +- LogicalTableScan(table=[[default_catalog, default_database, newX]]) +- LogicalSink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- LogicalProject(inputs=[0], exprs=[[1:BIGINT]]) +- LogicalTableScan(table=[[default_catalog, default_database, newX]]) {code} After physical optimization, the plan is {code:java} MultiSink :- Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) : +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) +- Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1]) +- Calc(select=[a AS $f0, 1:BIGINT AS $f1]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) {code} Before transforming to the ExecNode, we remove the {{MultiSink}} (which is only intended to work during the optimizing phase), then the final result can be {code:java} TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Reused(reference_id=[1]) Sink(table=[default_catalog.default_database.sink_table], fields=[$f0, $f1]) +- Calc(select=[a AS $f0, 1 AS $f1]) +- Reused(reference_id=[1]) {code} With the new RelNode, single-tree optimization is possible. We can do more things during the single tree optimization, e.g., introduce the cost model for the CTE to decide whether to inline/reuse and so on. > do optimize for multi sink in a single relNode tree > ---------------------------------------------------- > > Key: FLINK-31205 > URL: https://issues.apache.org/jira/browse/FLINK-31205 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: Aitozi > Priority: Major > > Flink supports multi sink usage, but it optimize the each sink in a > individual RelNode tree, this will miss some opportunity to do some cross > tree optimization, eg: > {code:java} > create table newX( > a int, > b bigint, > c varchar, > d varchar, > e varchar > ) with ( > 'connector' = 'values' > ,'enable-projection-push-down' = 'true' > insert into sink_table select a, b from newX > insert into sink_table select a, 1 from newX > {code} > It will produce the plan as below, this will cause the source be consumed > twice > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b]) > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Calc(select=[a, 1 AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a], metadata=[]]], fields=[a]) > {code} > In this ticket, I propose to do a global optimization for the multi sink by > * Megre the multi sink(with same table) into a single relNode tree with an > extra union node > * After optimization, split the merged union back to the original multi sink > In my poc, after step 1, it will produce the plan as below, I think it will > do good for the global performacne > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Union(all=[true], union=[a, b]) > :- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) > +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) > +- Reused(reference_id=[1]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)