xuyangzhong commented on code in PR #26379:
URL: https://github.com/apache/flink/pull/26379#discussion_r2048056880


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -105,6 +105,17 @@ public class OptimizerConfigOptions {
                                     + 
TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key()
                                     + " is true.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Boolean> 
TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
+            key("table.optimizer.reuse-sink-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When it is true, the optimizer will try to find 
duplicated table sinks and "

Review Comment:
   nit: use `find out` to align with `TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = 
groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || 
reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> 
reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) 
{
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            
reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing 
groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    public String getDigest(Sink sink) {

Review Comment:
   Could this part be replaced by 
`StreamPhysicalSink#getRelDetailedDescription` and 
`BatchPhysicalSink#getRelDetailedDescription` that can print sink digest 
without input information?
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = 
groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || 
reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> 
reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) 
{
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            
reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing 
groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    public String getDigest(Sink sink) {
+        List<String> digest = new ArrayList<>();
+        
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
+
+        int[][] targetColumns = sink.targetColumns();
+        if (targetColumns != null && targetColumns.length > 0) {
+            digest.add(
+                    "targetColumns=["
+                            + Arrays.stream(targetColumns)
+                                    .map(Arrays::toString)
+                                    .collect(Collectors.joining(","))
+                            + "]");
+        }
+
+        String fields =
+                sink.getRowType().getFieldList().stream()
+                        .map(f -> f.getType().toString())
+                        .collect(Collectors.joining(", "));
+        digest.add("fields=[" + fields + "]");
+        if (!sink.hints().isEmpty()) {
+            digest.add("hints=" + RelExplainUtil.hintsToString(sink.hints()));
+        }
+
+        return digest.toString();
+    }
+
+    private class ReusableSinkGroup {
+        private final List<Sink> originalSinks = new ArrayList<>();
+
+        private final SinkAbilitySpec[] sinkAbilitySpecs;
+
+        private final RelTraitSet inputTraitSet;
+
+        private final String digest;
+
+        ReusableSinkGroup(Sink sink) {
+            this.originalSinks.add(sink);
+            this.sinkAbilitySpecs = ((BatchPhysicalSink) sink).abilitySpecs();
+            this.inputTraitSet = sink.getInput().getTraitSet();
+            this.digest = getDigest(sink);
+        }
+
+        public boolean canBeReused(Sink sinkNode) {
+            String currentSinkDigest = getDigest(sinkNode);
+            SinkAbilitySpec[] currentSinkSpecs = ((BatchPhysicalSink) 
sinkNode).abilitySpecs();
+            RelTraitSet currentInputTraitSet = 
sinkNode.getInput().getTraitSet();
+
+            // Only table sink with the same digest, specs and input trait set 
can be reused

Review Comment:
   How about including this description in the class comments as well?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are

Review Comment:
   nit: `If so, these duplicate sinks will be merged into one.`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3

Review Comment:
   This sink name should be `Sink2`, right?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml:
##########
@@ -0,0 +1,243 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+       <TestCase name="testSinkReuse">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source4]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], 
fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source2]], 
fields=[x, y])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source3]], 
fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, source4]], 
fields=[x, y])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testSinkReuseFromSameSource">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], 
fields=[x, y])(reuse_id=[1])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testSinkReuseWithDifferentFieldNames">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x1, y1])
++- LogicalProject(x1=[$0], y1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
filed_name_change_source]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- Union(all=[true], union=[x, y])
+   :- TableSourceScan(table=[[default_catalog, default_database, source1]], 
fields=[x, y])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
filed_name_change_source]], fields=[x1, y1])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testSinkReuseWithOverwrite">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source2]])
+
+LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, y])
++- LogicalProject(x=[$0], y=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[x, y])

Review Comment:
   I was initially curious about why these sinks couldn't be merged and 
discovered that their specs differ. Do you agree that `SinkAbilitySpec` should 
include the digest on the RelNode plan just like `SourceAbilitySpec`? If so, I 
can create a JIRA ticket to track this improvement. 🤔



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = 
groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || 
reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> 
reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) 
{
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            
reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing 
groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    public String getDigest(Sink sink) {
+        List<String> digest = new ArrayList<>();
+        
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
+
+        int[][] targetColumns = sink.targetColumns();
+        if (targetColumns != null && targetColumns.length > 0) {
+            digest.add(
+                    "targetColumns=["
+                            + Arrays.stream(targetColumns)
+                                    .map(Arrays::toString)
+                                    .collect(Collectors.joining(","))
+                            + "]");
+        }
+
+        String fields =

Review Comment:
   Do we actually need this check? IIUC, if a type cast occurs, there will 
certainly be a node similar to a 'calc' node that performs the cast operation. 
The type on the sink will always align with the physical columns of the sink 
table.
   See more at `DynamicSinkUtils#validateSchemaAndApplyImplicitCast`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala:
##########
@@ -63,11 +63,18 @@ object SubplanReuser {
     val tableSourceReuseEnabled =
       
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED)
 
+    val tableSinkReuseEnabled =
+      
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED)
+
     var newRels = rels
     if (tableSourceReuseEnabled) {
       newRels = new ScanReuser(flinkContext, 
flinkTypeFactory).reuseDuplicatedScan(rels)
     }
 
+    if (tableSinkReuseEnabled && flinkContext.isBatchMode) {
+      newRels = new SinkReuser().reuseDuplicatedSink(rels)

Review Comment:
   Should this `rels` be replaced with `newRels`?
   Take the sql following as example:
   ```
   StatementSet statementSet = util.tableEnv().createStatementSet();
   statementSet.addInsertSql("INSERT INTO sink1 (SELECT x,y FROM source1)");
   statementSet.addInsertSql(
           "INSERT INTO sink1 (SELECT x, cast(null as bigint) FROM source1)");
   util.verifyExecPlan(statementSet);
   ```
   With current `rels`, the plan is:
   ```
   Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
   +- Union(all=[true], union=[x, y])
      :- TableSourceScan(table=[[default_catalog, default_database, source1]], 
fields=[x, y])
      +- Calc(select=[x, null:BIGINT AS EXPR$1])
         +- TableSourceScan(table=[[default_catalog, default_database, source1, 
project=[x], metadata=[]]], fields=[x])
   ```
   With `newRels`, the plan will be:
   ```
   Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
   +- Union(all=[true], union=[x, y])
      :- TableSourceScan(table=[[default_catalog, default_database, source1, 
project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
      +- Calc(select=[x, null:BIGINT AS EXPR$1])
         +- Reused(reference_id=[1])
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = 
groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || 
reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> 
reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) 
{
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            
reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing 
groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    public String getDigest(Sink sink) {
+        List<String> digest = new ArrayList<>();
+        
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
+
+        int[][] targetColumns = sink.targetColumns();
+        if (targetColumns != null && targetColumns.length > 0) {
+            digest.add(
+                    "targetColumns=["
+                            + Arrays.stream(targetColumns)
+                                    .map(Arrays::toString)
+                                    .collect(Collectors.joining(","))
+                            + "]");
+        }
+
+        String fields =
+                sink.getRowType().getFieldList().stream()
+                        .map(f -> f.getType().toString())
+                        .collect(Collectors.joining(", "));
+        digest.add("fields=[" + fields + "]");

Review Comment:
   The name is `fields`, but the value is types of these fields, which is 
confusing.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This checks if we can find duplicate sinks that can be reused. If so, only 
one of these sinks
+ * will be used. This is an optimization so that we do not need to process 
multiple sinks that are
+ * actually representing the same destination table.
+ *
+ * <p>This optimization is only used in the STATEMENT SET clause with multiple 
INSERT INTO.
+ *
+ * <p>Examples in SQL look like:
+ *
+ * <pre>{@code
+ * BEGIN STATEMENT SET;
+ * INSERT INTO sink1 SELECT * FROM source1;
+ * INSERT INTO sink1 SELECT * FROM source2;
+ * INSERT INTO sink2 SELECT * FROM source3;
+ * END;
+ * }</pre>
+ *
+ * <p>The plan is as follows:
+ *
+ * <pre>{@code
+ * TableScan1 —— Sink1
+ * TableScan2 —— Sink1
+ * TableScan3 —— Sink2
+ * }</pre>
+ *
+ * <p>After reused, the plan will be changed as follows:
+ *
+ * <pre>{@code
+ * TableScan1 --\
+ *               Union -- Sink1
+ * TableScan2 --/
+ *
+ * TableScan3 —— Sink3
+ * }</pre>
+ */
+public class SinkReuser {
+
+    public List<RelNode> reuseDuplicatedSink(List<RelNode> relNodes) {
+        // Find all sinks
+        List<Sink> allSinkNodes =
+                relNodes.stream()
+                        .filter(node -> node instanceof Sink)
+                        .map(node -> (Sink) node)
+                        .collect(Collectors.toList());
+        List<ReusableSinkGroup> reusableSinkGroups = 
groupReusableSink(allSinkNodes);
+
+        Set<Sink> reusedSinkNodes = reuseSinkAndAddUnion(reusableSinkGroups);
+
+        // Remove all unused sink nodes
+        return relNodes.stream()
+                .filter(root -> !(root instanceof Sink) || 
reusedSinkNodes.contains(root))
+                .collect(Collectors.toList());
+    }
+
+    private Set<Sink> reuseSinkAndAddUnion(List<ReusableSinkGroup> 
reusableSinkGroups) {
+        final Set<Sink> reusedSinkNodes = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        reusableSinkGroups.forEach(
+                group -> {
+                    List<Sink> originalSinks = group.originalSinks;
+                    if (originalSinks.size() <= 1) {
+                        Preconditions.checkState(originalSinks.size() == 1);
+                        reusedSinkNodes.add(originalSinks.get(0));
+                        return;
+                    }
+                    List<RelNode> allSinkInputs = new ArrayList<>();
+                    for (Sink sinkNode : originalSinks) {
+                        allSinkInputs.add(sinkNode.getInput());
+                    }
+
+                    // Use the first sink node as the final reused sink node
+                    Sink reusedSink = originalSinks.get(0);
+
+                    Union unionForReusedSinks;
+
+                    unionForReusedSinks =
+                            new BatchPhysicalUnion(
+                                    reusedSink.getCluster(),
+                                    group.inputTraitSet,
+                                    allSinkInputs,
+                                    true,
+                                    // use sink input row type
+                                    reusedSink.getRowType());
+
+                    reusedSink.replaceInput(0, unionForReusedSinks);
+                    reusedSinkNodes.add(reusedSink);
+                });
+        return reusedSinkNodes;
+    }
+
+    /**
+     * Grouping sinks that can be reused with each other.
+     *
+     * @param allSinkNodes in the plan.
+     * @return a list contains all grouped sink.
+     */
+    private List<ReusableSinkGroup> groupReusableSink(List<Sink> allSinkNodes) 
{
+        List<ReusableSinkGroup> reusableSinkGroups = new ArrayList<>();
+
+        for (Sink currentSinkNode : allSinkNodes) {
+            Optional<ReusableSinkGroup> targetGroup =
+                    reusableSinkGroups.stream()
+                            .filter(
+                                    reusableSinkGroup ->
+                                            
reusableSinkGroup.canBeReused(currentSinkNode))
+                            .findFirst();
+
+            if (targetGroup.isPresent()) {
+                targetGroup.get().originalSinks.add(currentSinkNode);
+            } else {
+                // If the current sink cannot be reused with any existing 
groups, create a new
+                // group.
+                reusableSinkGroups.add(new ReusableSinkGroup(currentSinkNode));
+            }
+        }
+        return reusableSinkGroups;
+    }
+
+    public String getDigest(Sink sink) {

Review Comment:
   nit: private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to