lincoln-lil commented on code in PR #26648:
URL: https://github.com/apache/flink/pull/26648#discussion_r2137376644


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -69,6 +69,9 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
             MiniBatchIntervalTrait.NONE.getMiniBatchInterval
           }
         sinkBlock.setMiniBatchInterval(miniBatchInterval)
+
+        // allow to produce duplicate changes by default
+        sinkBlock.setAllowDuplicateChanges(true)

Review Comment:
   Since duplicate changes are not the default behavior of current 
operators(only a few operators do support this), I think we should set to 
`false` by defalut here.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -182,6 +183,21 @@ class RelTreeWriterImpl(
       }
     }
 
+    if (withDuplicateChangesTrait) {
+      rel match {
+        case streamRel: StreamPhysicalRel =>
+          val duplicateChanges = 
DuplicateChangesUtils.getDuplicateChanges(streamRel)
+          val stringifyDuplicateChanges: String =
+            if (duplicateChanges.isEmpty) {
+              "EMPTY"

Review Comment:
   It's recommended to skip empty values, the `EMPTY` value might be confusing 
to some extent.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DuplicateChangesUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+
+import java.util.Optional;
+
+/** Utils for {@link DuplicateChanges}. */
+public class DuplicateChangesUtils {
+
+    private DuplicateChangesUtils() {}
+
+    /**
+     * Get an optional {@link DuplicateChanges} from the given {@link 
StreamPhysicalRel}.
+     *
+     * <p>The {@link DuplicateChanges} is inferred from {@link 
DuplicateChangesTrait}.
+     */
+    public static Optional<DuplicateChanges> 
getDuplicateChanges(StreamPhysicalRel rel) {
+        Optional<DuplicateChangesTrait> duplicateChangesTraitOp =
+                
Optional.ofNullable(rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE));
+        return duplicateChangesTraitOp.stream()
+                .map(DuplicateChangesTrait::getDuplicateChanges)
+                .findFirst();
+    }
+
+    /**
+     * Merge the given two {@link DuplicateChanges} as a new one.
+     *
+     * <p>The logic matrix is following:
+     *
+     * <pre>
+     *       +-------------+-------------+---------------+
+     *       | origin_1    | origin_2    | merge result  |

Review Comment:
   nit: the table is not aligned.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -270,10 +291,25 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
           // propagate updateKind trait to child block
           val requireUB = updateKindTrait.updateKind == 
UpdateKind.BEFORE_AND_AFTER
           childBlock.setUpdateBeforeRequired(requireUB || 
childBlock.isUpdateBeforeRequired)
+          // propagate duplicateChanges trait to child block
+          val allowDuplicateChanges = 
isAllowDuplicateChanges(duplicateChangesTrait)
+          childBlock.setAllowDuplicateChanges(allowDuplicateChanges)
         }
       case ser: StreamPhysicalRel => ser.getInputs.foreach(e => 
propagateTraits(e))
       case _ => // do nothing
     }
+
+    def isAllowDuplicateChanges(duplicateChangesTrait: DuplicateChangesTrait): 
Boolean = {
+      duplicateChangesTrait.getDuplicateChanges match {
+        case DuplicateChanges.ALLOW => true
+        case DuplicateChanges.DISALLOW => false
+        case DuplicateChanges.NONE => true
+        case _ =>

Review Comment:
   This branch looks unnecessary, and we can also merge `NONE` and `ALLOW` into 
a single case.
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.legacy.table.sinks.RetractStreamTableSink;
+import org.apache.flink.legacy.table.sinks.StreamTableSink;
+import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.legacy.sinks.TableSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils;
+import org.apache.flink.table.planner.sinks.DataStreamTableSink;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A rule that infers the {@link DuplicateChanges} for each {@link 
StreamPhysicalRel} node.
+ *
+ * <p>The derivation of the trait {@link DuplicateChanges} flows from the root 
to the leaf, that is,
+ * from the sink to the source.
+ *
+ * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order.
+ */
+@Internal
+@Value.Enclosing
+public class DuplicateChangesInferRule extends 
RelRule<DuplicateChangesInferRule.Config> {
+
+    public static final DuplicateChangesInferRule INSTANCE =
+            DuplicateChangesInferRule.Config.DEFAULT.toRule();
+
+    protected DuplicateChangesInferRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        StreamPhysicalRel rel = call.rel(0);
+        DuplicateChangesTrait parentTrait =
+                rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+        Preconditions.checkState(parentTrait != null);
+
+        DuplicateChangesTrait requiredTrait;
+        if (rel instanceof StreamPhysicalSink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalSink) rel);
+            boolean isMaterialized = ((StreamPhysicalSink) 
rel).upsertMaterialize();
+            requiredTrait =
+                    canConsumeDuplicateChanges && !isMaterialized
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalLegacySink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) 
rel);
+            requiredTrait =
+                    canConsumeDuplicateChanges
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalCalcBase) {
+            // if the calc contains non-deterministic fields, we should not 
allow duplicate
+            if 
(existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) {
+                requiredTrait = DuplicateChangesTrait.DISALLOW;
+            } else {
+                requiredTrait = parentTrait;
+            }
+        } else if (rel instanceof StreamPhysicalExchange
+                || rel instanceof StreamPhysicalMiniBatchAssigner
+                || rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalDropUpdateBefore
+                || rel instanceof StreamPhysicalTableSourceScan
+                || rel instanceof StreamPhysicalDataStreamScan
+                || rel instanceof StreamPhysicalLegacyTableSourceScan
+                || rel instanceof StreamPhysicalIntermediateTableScan) {
+            // forward parent requirement
+            requiredTrait = parentTrait;
+        } else {
+            // if not explicitly supported, all operators should not accept 
duplicate changes
+            // TODO consider more operators to support consuming duplicate 
changes
+            requiredTrait = DuplicateChangesTrait.DISALLOW;
+        }
+
+        boolean anyInputUpdated = false;
+        List<RelNode> inputs = getInputs(rel);
+        List<RelNode> newInputs = new ArrayList<>();
+        for (RelNode input : inputs) {
+            DuplicateChangesTrait inputOriginalTrait =
+                    
input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+            if (!requiredTrait.equals(inputOriginalTrait)) {
+                DuplicateChangesTrait mergedTrait =
+                        getMergedDuplicateChangesTrait(inputOriginalTrait, 
requiredTrait);
+                RelNode newInput =
+                        input.copy(input.getTraitSet().plus(mergedTrait), 
input.getInputs());
+                newInputs.add(newInput);
+                anyInputUpdated = true;
+            } else {
+                newInputs.add(input);
+            }
+        }
+        // update parent if a child was updated
+        if (anyInputUpdated) {
+            RelNode newRel = rel.copy(rel.getTraitSet(), newInputs);
+            call.transformTo(newRel);
+        }
+    }
+
+    private static DuplicateChangesTrait getMergedDuplicateChangesTrait(
+            DuplicateChangesTrait inputOriginalTrait, DuplicateChangesTrait 
newRequiredTrait) {
+        DuplicateChangesTrait mergedTrait;
+        if (inputOriginalTrait == null) {
+            mergedTrait = newRequiredTrait;
+        } else {
+            DuplicateChanges mergedDuplicateChanges =
+                    DuplicateChangesUtils.mergeDuplicateChanges(
+                            inputOriginalTrait.getDuplicateChanges(),
+                            newRequiredTrait.getDuplicateChanges());
+            mergedTrait = new DuplicateChangesTrait(mergedDuplicateChanges);
+        }
+        return mergedTrait;
+    }
+
+    private boolean 
existNonDeterministicFiltersOrProjections(StreamPhysicalCalcBase calc) {
+        RexProgram calcProgram = calc.getProgram();
+        // all projections and conditions should be expanded and checked
+        return 
!calcProgram.getExprList().stream().allMatch(RexUtil::isDeterministic);
+    }
+
+    private boolean canConsumeDuplicateChanges(StreamPhysicalSink sink) {
+        boolean onlyAcceptInsertOnly = false;

Review Comment:
   Using `acceptUpdates|Changes` may be easier to read (then the return clause 
will'be `acceptUpdates && ...`)



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkDuplicateChangesTraitInitProgram.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * A {@link FlinkOptimizeProgram} that does some initialization for {@link 
DuplicateChanges}

Review Comment:
   nit: inferences ->  inference



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.legacy.table.sinks.RetractStreamTableSink;
+import org.apache.flink.legacy.table.sinks.StreamTableSink;
+import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.legacy.sinks.TableSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils;
+import org.apache.flink.table.planner.sinks.DataStreamTableSink;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A rule that infers the {@link DuplicateChanges} for each {@link 
StreamPhysicalRel} node.
+ *
+ * <p>The derivation of the trait {@link DuplicateChanges} flows from the root 
to the leaf, that is,
+ * from the sink to the source.
+ *
+ * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order.

Review Comment:
   nit: match -> matching



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DuplicateChanges.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.trait;
+
+/**
+ * Lists all kinds of all behaviors to describe whether the node can produce 
duplicated changes for

Review Comment:
   -> 'Lists all kinds of behaviors'



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala:
##########
@@ -139,6 +141,16 @@ class RelNodeBlock(val outputNode: RelNode) {
 
   def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
 
+  def setAllowDuplicateChanges(allowDuplicateChanges: Boolean): Unit = {
+    // a child block may have multiple parents (outputs), if one of the 
parents could not
+    // accept duplicated changes, then this child block doesn't allow to 
produce duplicate changes
+    if (this.allowDuplicateChanges) {
+      this.allowDuplicateChanges = allowDuplicateChanges
+    }
+  }
+
+  def isAllowDuplicateChanges: Boolean = allowDuplicateChanges

Review Comment:
   nit:  using `allowDuplicateChanges: Boolean` is simpler



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.legacy.table.sinks.RetractStreamTableSink;
+import org.apache.flink.legacy.table.sinks.StreamTableSink;
+import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.legacy.sinks.TableSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils;
+import org.apache.flink.table.planner.sinks.DataStreamTableSink;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A rule that infers the {@link DuplicateChanges} for each {@link 
StreamPhysicalRel} node.
+ *
+ * <p>The derivation of the trait {@link DuplicateChanges} flows from the root 
to the leaf, that is,
+ * from the sink to the source.
+ *
+ * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order.
+ */
+@Internal
+@Value.Enclosing
+public class DuplicateChangesInferRule extends 
RelRule<DuplicateChangesInferRule.Config> {
+
+    public static final DuplicateChangesInferRule INSTANCE =
+            DuplicateChangesInferRule.Config.DEFAULT.toRule();
+
+    protected DuplicateChangesInferRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        StreamPhysicalRel rel = call.rel(0);
+        DuplicateChangesTrait parentTrait =
+                rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+        Preconditions.checkState(parentTrait != null);
+
+        DuplicateChangesTrait requiredTrait;
+        if (rel instanceof StreamPhysicalSink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalSink) rel);
+            boolean isMaterialized = ((StreamPhysicalSink) 
rel).upsertMaterialize();
+            requiredTrait =
+                    canConsumeDuplicateChanges && !isMaterialized
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalLegacySink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) 
rel);
+            requiredTrait =
+                    canConsumeDuplicateChanges
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalCalcBase) {
+            // if the calc contains non-deterministic fields, we should not 
allow duplicate
+            if 
(existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) {
+                requiredTrait = DuplicateChangesTrait.DISALLOW;
+            } else {
+                requiredTrait = parentTrait;
+            }
+        } else if (rel instanceof StreamPhysicalExchange
+                || rel instanceof StreamPhysicalMiniBatchAssigner
+                || rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalDropUpdateBefore
+                || rel instanceof StreamPhysicalTableSourceScan
+                || rel instanceof StreamPhysicalDataStreamScan
+                || rel instanceof StreamPhysicalLegacyTableSourceScan
+                || rel instanceof StreamPhysicalIntermediateTableScan) {
+            // forward parent requirement
+            requiredTrait = parentTrait;
+        } else {
+            // if not explicitly supported, all operators should not accept 
duplicate changes
+            // TODO consider more operators to support consuming duplicate 
changes
+            requiredTrait = DuplicateChangesTrait.DISALLOW;
+        }
+
+        boolean anyInputUpdated = false;
+        List<RelNode> inputs = getInputs(rel);
+        List<RelNode> newInputs = new ArrayList<>();
+        for (RelNode input : inputs) {
+            DuplicateChangesTrait inputOriginalTrait =
+                    
input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+            if (!requiredTrait.equals(inputOriginalTrait)) {
+                DuplicateChangesTrait mergedTrait =
+                        getMergedDuplicateChangesTrait(inputOriginalTrait, 
requiredTrait);
+                RelNode newInput =
+                        input.copy(input.getTraitSet().plus(mergedTrait), 
input.getInputs());
+                newInputs.add(newInput);
+                anyInputUpdated = true;
+            } else {
+                newInputs.add(input);
+            }
+        }
+        // update parent if a child was updated
+        if (anyInputUpdated) {
+            RelNode newRel = rel.copy(rel.getTraitSet(), newInputs);
+            call.transformTo(newRel);
+        }
+    }
+
+    private static DuplicateChangesTrait getMergedDuplicateChangesTrait(

Review Comment:
   -> 'mergeDuplicateChangesTrait'



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.legacy.table.sinks.RetractStreamTableSink;
+import org.apache.flink.legacy.table.sinks.StreamTableSink;
+import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.legacy.sinks.TableSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils;
+import org.apache.flink.table.planner.sinks.DataStreamTableSink;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A rule that infers the {@link DuplicateChanges} for each {@link 
StreamPhysicalRel} node.
+ *
+ * <p>The derivation of the trait {@link DuplicateChanges} flows from the root 
to the leaf, that is,
+ * from the sink to the source.
+ *
+ * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order.
+ */
+@Internal
+@Value.Enclosing
+public class DuplicateChangesInferRule extends 
RelRule<DuplicateChangesInferRule.Config> {
+
+    public static final DuplicateChangesInferRule INSTANCE =
+            DuplicateChangesInferRule.Config.DEFAULT.toRule();
+
+    protected DuplicateChangesInferRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        StreamPhysicalRel rel = call.rel(0);
+        DuplicateChangesTrait parentTrait =
+                rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+        Preconditions.checkState(parentTrait != null);
+
+        DuplicateChangesTrait requiredTrait;
+        if (rel instanceof StreamPhysicalSink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalSink) rel);
+            boolean isMaterialized = ((StreamPhysicalSink) 
rel).upsertMaterialize();
+            requiredTrait =
+                    canConsumeDuplicateChanges && !isMaterialized
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalLegacySink) {
+            boolean canConsumeDuplicateChanges =
+                    canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) 
rel);
+            requiredTrait =
+                    canConsumeDuplicateChanges
+                            ? DuplicateChangesTrait.ALLOW
+                            : DuplicateChangesTrait.DISALLOW;
+        } else if (rel instanceof StreamPhysicalCalcBase) {
+            // if the calc contains non-deterministic fields, we should not 
allow duplicate
+            if 
(existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) {
+                requiredTrait = DuplicateChangesTrait.DISALLOW;
+            } else {
+                requiredTrait = parentTrait;
+            }
+        } else if (rel instanceof StreamPhysicalExchange
+                || rel instanceof StreamPhysicalMiniBatchAssigner
+                || rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalDropUpdateBefore
+                || rel instanceof StreamPhysicalTableSourceScan
+                || rel instanceof StreamPhysicalDataStreamScan
+                || rel instanceof StreamPhysicalLegacyTableSourceScan
+                || rel instanceof StreamPhysicalIntermediateTableScan) {
+            // forward parent requirement
+            requiredTrait = parentTrait;
+        } else {
+            // if not explicitly supported, all operators should not accept 
duplicate changes
+            // TODO consider more operators to support consuming duplicate 
changes
+            requiredTrait = DuplicateChangesTrait.DISALLOW;
+        }
+
+        boolean anyInputUpdated = false;
+        List<RelNode> inputs = getInputs(rel);
+        List<RelNode> newInputs = new ArrayList<>();
+        for (RelNode input : inputs) {
+            DuplicateChangesTrait inputOriginalTrait =
+                    
input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE);
+            if (!requiredTrait.equals(inputOriginalTrait)) {
+                DuplicateChangesTrait mergedTrait =
+                        getMergedDuplicateChangesTrait(inputOriginalTrait, 
requiredTrait);
+                RelNode newInput =
+                        input.copy(input.getTraitSet().plus(mergedTrait), 
input.getInputs());
+                newInputs.add(newInput);
+                anyInputUpdated = true;
+            } else {
+                newInputs.add(input);
+            }
+        }
+        // update parent if a child was updated
+        if (anyInputUpdated) {
+            RelNode newRel = rel.copy(rel.getTraitSet(), newInputs);
+            call.transformTo(newRel);
+        }
+    }
+
+    private static DuplicateChangesTrait getMergedDuplicateChangesTrait(
+            DuplicateChangesTrait inputOriginalTrait, DuplicateChangesTrait 
newRequiredTrait) {
+        DuplicateChangesTrait mergedTrait;
+        if (inputOriginalTrait == null) {
+            mergedTrait = newRequiredTrait;
+        } else {
+            DuplicateChanges mergedDuplicateChanges =
+                    DuplicateChangesUtils.mergeDuplicateChanges(
+                            inputOriginalTrait.getDuplicateChanges(),
+                            newRequiredTrait.getDuplicateChanges());
+            mergedTrait = new DuplicateChangesTrait(mergedDuplicateChanges);
+        }
+        return mergedTrait;
+    }
+
+    private boolean 
existNonDeterministicFiltersOrProjections(StreamPhysicalCalcBase calc) {
+        RexProgram calcProgram = calc.getProgram();
+        // all projections and conditions should be expanded and checked
+        return 
!calcProgram.getExprList().stream().allMatch(RexUtil::isDeterministic);

Review Comment:
   Can be simplified to `FlinkRexUtil.isDeterministic(calc.getProgram())`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala:
##########
@@ -139,6 +141,16 @@ class RelNodeBlock(val outputNode: RelNode) {
 
   def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval
 
+  def setAllowDuplicateChanges(allowDuplicateChanges: Boolean): Unit = {

Review Comment:
   Based on the logic, it might be more appropriate to call this `mergeXx` 
instead of a setter.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DuplicateChangesUtils.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import org.apache.flink.table.planner.plan.trait.DuplicateChanges;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait;
+import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef;
+
+import java.util.Optional;
+
+/** Utils for {@link DuplicateChanges}. */
+public class DuplicateChangesUtils {
+
+    private DuplicateChangesUtils() {}
+
+    /**
+     * Get an optional {@link DuplicateChanges} from the given {@link 
StreamPhysicalRel}.
+     *
+     * <p>The {@link DuplicateChanges} is inferred from {@link 
DuplicateChangesTrait}.
+     */
+    public static Optional<DuplicateChanges> 
getDuplicateChanges(StreamPhysicalRel rel) {
+        Optional<DuplicateChangesTrait> duplicateChangesTraitOp =
+                
Optional.ofNullable(rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE));
+        return duplicateChangesTraitOp.stream()
+                .map(DuplicateChangesTrait::getDuplicateChanges)
+                .findFirst();
+    }
+
+    /**
+     * Merge the given two {@link DuplicateChanges} as a new one.
+     *
+     * <p>The logic matrix is following:
+     *
+     * <pre>
+     *       +-------------+-------------+---------------+
+     *       | origin_1    | origin_2    | merge result  |
+     *       +-------------+-------------+---------------+
+     *       | NONE        | `ANY`       |   `ANY`      |

Review Comment:
   What does`ANY` stand for? Didn't see related logic in the method. 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala:
##########
@@ -105,6 +105,8 @@ class RelNodeBlock(val outputNode: RelNode) {
 
   private var miniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE
 
+  private var allowDuplicateChanges: Boolean = true

Review Comment:
   Using `false` as the default value is more intuitive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to