Aitozi created FLINK-31205: ------------------------------ Summary: do optimize for multi sink in a single relNode tree Key: FLINK-31205 URL: https://issues.apache.org/jira/browse/FLINK-31205 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi
Flink supports multi sink usage, but it optimize the each sink in a individual RelNode tree, this will miss some opportunity to do some cross tree optimization, eg: {code:java} create table newX( a int, b bigint, c varchar, d varchar, e varchar ) with ( 'connector' = 'values' ,'enable-projection-push-down' = 'true' insert into sink_table select a, b from newX insert into sink_table select a, 1 from newX {code} It will produce the plan as below, this will cause the source be consumed twice {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b]) Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Calc(select=[a, 1 AS b]) +- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a], metadata=[]]], fields=[a]) {code} In this ticket, I propose to do a global optimization for the multi sink by * Megre the multi sink(with same table) into a single relNode tree with an extra union node * After optimization, split the merged union back to the original multi sink In my poc, after step 1, it will produce the plan as below, I think it will do good for the global performacne {code:java} Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) +- Union(all=[true], union=[a, b]) :- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) +- Reused(reference_id=[1]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)