lincoln-lil commented on code in PR #26648: URL: https://github.com/apache/flink/pull/26648#discussion_r2137376644
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ########## @@ -69,6 +69,9 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) MiniBatchIntervalTrait.NONE.getMiniBatchInterval } sinkBlock.setMiniBatchInterval(miniBatchInterval) + + // allow to produce duplicate changes by default + sinkBlock.setAllowDuplicateChanges(true) Review Comment: Since duplicate changes are not the default behavior of current operators(only a few operators do support this), I think we should set to `false` by defalut here. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala: ########## @@ -182,6 +183,21 @@ class RelTreeWriterImpl( } } + if (withDuplicateChangesTrait) { + rel match { + case streamRel: StreamPhysicalRel => + val duplicateChanges = DuplicateChangesUtils.getDuplicateChanges(streamRel) + val stringifyDuplicateChanges: String = + if (duplicateChanges.isEmpty) { + "EMPTY" Review Comment: It's recommended to skip empty values, the `EMPTY` value might be confusing to some extent. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DuplicateChangesUtils.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; + +import java.util.Optional; + +/** Utils for {@link DuplicateChanges}. */ +public class DuplicateChangesUtils { + + private DuplicateChangesUtils() {} + + /** + * Get an optional {@link DuplicateChanges} from the given {@link StreamPhysicalRel}. + * + * <p>The {@link DuplicateChanges} is inferred from {@link DuplicateChangesTrait}. + */ + public static Optional<DuplicateChanges> getDuplicateChanges(StreamPhysicalRel rel) { + Optional<DuplicateChangesTrait> duplicateChangesTraitOp = + Optional.ofNullable(rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE)); + return duplicateChangesTraitOp.stream() + .map(DuplicateChangesTrait::getDuplicateChanges) + .findFirst(); + } + + /** + * Merge the given two {@link DuplicateChanges} as a new one. + * + * <p>The logic matrix is following: + * + * <pre> + * +-------------+-------------+---------------+ + * | origin_1 | origin_2 | merge result | Review Comment: nit: the table is not aligned. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ########## @@ -270,10 +291,25 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) // propagate updateKind trait to child block val requireUB = updateKindTrait.updateKind == UpdateKind.BEFORE_AND_AFTER childBlock.setUpdateBeforeRequired(requireUB || childBlock.isUpdateBeforeRequired) + // propagate duplicateChanges trait to child block + val allowDuplicateChanges = isAllowDuplicateChanges(duplicateChangesTrait) + childBlock.setAllowDuplicateChanges(allowDuplicateChanges) } case ser: StreamPhysicalRel => ser.getInputs.foreach(e => propagateTraits(e)) case _ => // do nothing } + + def isAllowDuplicateChanges(duplicateChangesTrait: DuplicateChangesTrait): Boolean = { + duplicateChangesTrait.getDuplicateChanges match { + case DuplicateChanges.ALLOW => true + case DuplicateChanges.DISALLOW => false + case DuplicateChanges.NONE => true + case _ => Review Comment: This branch looks unnecessary, and we can also merge `NONE` and `ALLOW` into a single case. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.legacy.table.sinks.RetractStreamTableSink; +import org.apache.flink.legacy.table.sinks.StreamTableSink; +import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.legacy.sinks.TableSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; +import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils; +import org.apache.flink.table.planner.sinks.DataStreamTableSink; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A rule that infers the {@link DuplicateChanges} for each {@link StreamPhysicalRel} node. + * + * <p>The derivation of the trait {@link DuplicateChanges} flows from the root to the leaf, that is, + * from the sink to the source. + * + * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order. + */ +@Internal +@Value.Enclosing +public class DuplicateChangesInferRule extends RelRule<DuplicateChangesInferRule.Config> { + + public static final DuplicateChangesInferRule INSTANCE = + DuplicateChangesInferRule.Config.DEFAULT.toRule(); + + protected DuplicateChangesInferRule(Config config) { + super(config); + } + + @Override + public void onMatch(RelOptRuleCall call) { + StreamPhysicalRel rel = call.rel(0); + DuplicateChangesTrait parentTrait = + rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + Preconditions.checkState(parentTrait != null); + + DuplicateChangesTrait requiredTrait; + if (rel instanceof StreamPhysicalSink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalSink) rel); + boolean isMaterialized = ((StreamPhysicalSink) rel).upsertMaterialize(); + requiredTrait = + canConsumeDuplicateChanges && !isMaterialized + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalLegacySink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) rel); + requiredTrait = + canConsumeDuplicateChanges + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalCalcBase) { + // if the calc contains non-deterministic fields, we should not allow duplicate + if (existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) { + requiredTrait = DuplicateChangesTrait.DISALLOW; + } else { + requiredTrait = parentTrait; + } + } else if (rel instanceof StreamPhysicalExchange + || rel instanceof StreamPhysicalMiniBatchAssigner + || rel instanceof StreamPhysicalWatermarkAssigner + || rel instanceof StreamPhysicalDropUpdateBefore + || rel instanceof StreamPhysicalTableSourceScan + || rel instanceof StreamPhysicalDataStreamScan + || rel instanceof StreamPhysicalLegacyTableSourceScan + || rel instanceof StreamPhysicalIntermediateTableScan) { + // forward parent requirement + requiredTrait = parentTrait; + } else { + // if not explicitly supported, all operators should not accept duplicate changes + // TODO consider more operators to support consuming duplicate changes + requiredTrait = DuplicateChangesTrait.DISALLOW; + } + + boolean anyInputUpdated = false; + List<RelNode> inputs = getInputs(rel); + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : inputs) { + DuplicateChangesTrait inputOriginalTrait = + input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + if (!requiredTrait.equals(inputOriginalTrait)) { + DuplicateChangesTrait mergedTrait = + getMergedDuplicateChangesTrait(inputOriginalTrait, requiredTrait); + RelNode newInput = + input.copy(input.getTraitSet().plus(mergedTrait), input.getInputs()); + newInputs.add(newInput); + anyInputUpdated = true; + } else { + newInputs.add(input); + } + } + // update parent if a child was updated + if (anyInputUpdated) { + RelNode newRel = rel.copy(rel.getTraitSet(), newInputs); + call.transformTo(newRel); + } + } + + private static DuplicateChangesTrait getMergedDuplicateChangesTrait( + DuplicateChangesTrait inputOriginalTrait, DuplicateChangesTrait newRequiredTrait) { + DuplicateChangesTrait mergedTrait; + if (inputOriginalTrait == null) { + mergedTrait = newRequiredTrait; + } else { + DuplicateChanges mergedDuplicateChanges = + DuplicateChangesUtils.mergeDuplicateChanges( + inputOriginalTrait.getDuplicateChanges(), + newRequiredTrait.getDuplicateChanges()); + mergedTrait = new DuplicateChangesTrait(mergedDuplicateChanges); + } + return mergedTrait; + } + + private boolean existNonDeterministicFiltersOrProjections(StreamPhysicalCalcBase calc) { + RexProgram calcProgram = calc.getProgram(); + // all projections and conditions should be expanded and checked + return !calcProgram.getExprList().stream().allMatch(RexUtil::isDeterministic); + } + + private boolean canConsumeDuplicateChanges(StreamPhysicalSink sink) { + boolean onlyAcceptInsertOnly = false; Review Comment: Using `acceptUpdates|Changes` may be easier to read (then the return clause will'be `acceptUpdates && ...`) ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkDuplicateChangesTraitInitProgram.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; + +import org.apache.calcite.rel.RelNode; + +/** + * A {@link FlinkOptimizeProgram} that does some initialization for {@link DuplicateChanges} Review Comment: nit: inferences -> inference ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.legacy.table.sinks.RetractStreamTableSink; +import org.apache.flink.legacy.table.sinks.StreamTableSink; +import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.legacy.sinks.TableSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; +import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils; +import org.apache.flink.table.planner.sinks.DataStreamTableSink; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A rule that infers the {@link DuplicateChanges} for each {@link StreamPhysicalRel} node. + * + * <p>The derivation of the trait {@link DuplicateChanges} flows from the root to the leaf, that is, + * from the sink to the source. + * + * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order. Review Comment: nit: match -> matching ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DuplicateChanges.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.trait; + +/** + * Lists all kinds of all behaviors to describe whether the node can produce duplicated changes for Review Comment: -> 'Lists all kinds of behaviors' ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala: ########## @@ -139,6 +141,16 @@ class RelNodeBlock(val outputNode: RelNode) { def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval + def setAllowDuplicateChanges(allowDuplicateChanges: Boolean): Unit = { + // a child block may have multiple parents (outputs), if one of the parents could not + // accept duplicated changes, then this child block doesn't allow to produce duplicate changes + if (this.allowDuplicateChanges) { + this.allowDuplicateChanges = allowDuplicateChanges + } + } + + def isAllowDuplicateChanges: Boolean = allowDuplicateChanges Review Comment: nit: using `allowDuplicateChanges: Boolean` is simpler ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.legacy.table.sinks.RetractStreamTableSink; +import org.apache.flink.legacy.table.sinks.StreamTableSink; +import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.legacy.sinks.TableSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; +import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils; +import org.apache.flink.table.planner.sinks.DataStreamTableSink; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A rule that infers the {@link DuplicateChanges} for each {@link StreamPhysicalRel} node. + * + * <p>The derivation of the trait {@link DuplicateChanges} flows from the root to the leaf, that is, + * from the sink to the source. + * + * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order. + */ +@Internal +@Value.Enclosing +public class DuplicateChangesInferRule extends RelRule<DuplicateChangesInferRule.Config> { + + public static final DuplicateChangesInferRule INSTANCE = + DuplicateChangesInferRule.Config.DEFAULT.toRule(); + + protected DuplicateChangesInferRule(Config config) { + super(config); + } + + @Override + public void onMatch(RelOptRuleCall call) { + StreamPhysicalRel rel = call.rel(0); + DuplicateChangesTrait parentTrait = + rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + Preconditions.checkState(parentTrait != null); + + DuplicateChangesTrait requiredTrait; + if (rel instanceof StreamPhysicalSink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalSink) rel); + boolean isMaterialized = ((StreamPhysicalSink) rel).upsertMaterialize(); + requiredTrait = + canConsumeDuplicateChanges && !isMaterialized + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalLegacySink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) rel); + requiredTrait = + canConsumeDuplicateChanges + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalCalcBase) { + // if the calc contains non-deterministic fields, we should not allow duplicate + if (existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) { + requiredTrait = DuplicateChangesTrait.DISALLOW; + } else { + requiredTrait = parentTrait; + } + } else if (rel instanceof StreamPhysicalExchange + || rel instanceof StreamPhysicalMiniBatchAssigner + || rel instanceof StreamPhysicalWatermarkAssigner + || rel instanceof StreamPhysicalDropUpdateBefore + || rel instanceof StreamPhysicalTableSourceScan + || rel instanceof StreamPhysicalDataStreamScan + || rel instanceof StreamPhysicalLegacyTableSourceScan + || rel instanceof StreamPhysicalIntermediateTableScan) { + // forward parent requirement + requiredTrait = parentTrait; + } else { + // if not explicitly supported, all operators should not accept duplicate changes + // TODO consider more operators to support consuming duplicate changes + requiredTrait = DuplicateChangesTrait.DISALLOW; + } + + boolean anyInputUpdated = false; + List<RelNode> inputs = getInputs(rel); + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : inputs) { + DuplicateChangesTrait inputOriginalTrait = + input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + if (!requiredTrait.equals(inputOriginalTrait)) { + DuplicateChangesTrait mergedTrait = + getMergedDuplicateChangesTrait(inputOriginalTrait, requiredTrait); + RelNode newInput = + input.copy(input.getTraitSet().plus(mergedTrait), input.getInputs()); + newInputs.add(newInput); + anyInputUpdated = true; + } else { + newInputs.add(input); + } + } + // update parent if a child was updated + if (anyInputUpdated) { + RelNode newRel = rel.copy(rel.getTraitSet(), newInputs); + call.transformTo(newRel); + } + } + + private static DuplicateChangesTrait getMergedDuplicateChangesTrait( Review Comment: -> 'mergeDuplicateChangesTrait' ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.legacy.table.sinks.RetractStreamTableSink; +import org.apache.flink.legacy.table.sinks.StreamTableSink; +import org.apache.flink.legacy.table.sinks.UpsertStreamTableSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.legacy.sinks.TableSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; +import org.apache.flink.table.planner.plan.utils.DuplicateChangesUtils; +import org.apache.flink.table.planner.sinks.DataStreamTableSink; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexUtil; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A rule that infers the {@link DuplicateChanges} for each {@link StreamPhysicalRel} node. + * + * <p>The derivation of the trait {@link DuplicateChanges} flows from the root to the leaf, that is, + * from the sink to the source. + * + * <p>Notes: This rule only supports HepPlanner with TOP_DOWN match order. + */ +@Internal +@Value.Enclosing +public class DuplicateChangesInferRule extends RelRule<DuplicateChangesInferRule.Config> { + + public static final DuplicateChangesInferRule INSTANCE = + DuplicateChangesInferRule.Config.DEFAULT.toRule(); + + protected DuplicateChangesInferRule(Config config) { + super(config); + } + + @Override + public void onMatch(RelOptRuleCall call) { + StreamPhysicalRel rel = call.rel(0); + DuplicateChangesTrait parentTrait = + rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + Preconditions.checkState(parentTrait != null); + + DuplicateChangesTrait requiredTrait; + if (rel instanceof StreamPhysicalSink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalSink) rel); + boolean isMaterialized = ((StreamPhysicalSink) rel).upsertMaterialize(); + requiredTrait = + canConsumeDuplicateChanges && !isMaterialized + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalLegacySink) { + boolean canConsumeDuplicateChanges = + canConsumeDuplicateChanges((StreamPhysicalLegacySink<?>) rel); + requiredTrait = + canConsumeDuplicateChanges + ? DuplicateChangesTrait.ALLOW + : DuplicateChangesTrait.DISALLOW; + } else if (rel instanceof StreamPhysicalCalcBase) { + // if the calc contains non-deterministic fields, we should not allow duplicate + if (existNonDeterministicFiltersOrProjections((StreamPhysicalCalcBase) rel)) { + requiredTrait = DuplicateChangesTrait.DISALLOW; + } else { + requiredTrait = parentTrait; + } + } else if (rel instanceof StreamPhysicalExchange + || rel instanceof StreamPhysicalMiniBatchAssigner + || rel instanceof StreamPhysicalWatermarkAssigner + || rel instanceof StreamPhysicalDropUpdateBefore + || rel instanceof StreamPhysicalTableSourceScan + || rel instanceof StreamPhysicalDataStreamScan + || rel instanceof StreamPhysicalLegacyTableSourceScan + || rel instanceof StreamPhysicalIntermediateTableScan) { + // forward parent requirement + requiredTrait = parentTrait; + } else { + // if not explicitly supported, all operators should not accept duplicate changes + // TODO consider more operators to support consuming duplicate changes + requiredTrait = DuplicateChangesTrait.DISALLOW; + } + + boolean anyInputUpdated = false; + List<RelNode> inputs = getInputs(rel); + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : inputs) { + DuplicateChangesTrait inputOriginalTrait = + input.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE); + if (!requiredTrait.equals(inputOriginalTrait)) { + DuplicateChangesTrait mergedTrait = + getMergedDuplicateChangesTrait(inputOriginalTrait, requiredTrait); + RelNode newInput = + input.copy(input.getTraitSet().plus(mergedTrait), input.getInputs()); + newInputs.add(newInput); + anyInputUpdated = true; + } else { + newInputs.add(input); + } + } + // update parent if a child was updated + if (anyInputUpdated) { + RelNode newRel = rel.copy(rel.getTraitSet(), newInputs); + call.transformTo(newRel); + } + } + + private static DuplicateChangesTrait getMergedDuplicateChangesTrait( + DuplicateChangesTrait inputOriginalTrait, DuplicateChangesTrait newRequiredTrait) { + DuplicateChangesTrait mergedTrait; + if (inputOriginalTrait == null) { + mergedTrait = newRequiredTrait; + } else { + DuplicateChanges mergedDuplicateChanges = + DuplicateChangesUtils.mergeDuplicateChanges( + inputOriginalTrait.getDuplicateChanges(), + newRequiredTrait.getDuplicateChanges()); + mergedTrait = new DuplicateChangesTrait(mergedDuplicateChanges); + } + return mergedTrait; + } + + private boolean existNonDeterministicFiltersOrProjections(StreamPhysicalCalcBase calc) { + RexProgram calcProgram = calc.getProgram(); + // all projections and conditions should be expanded and checked + return !calcProgram.getExprList().stream().allMatch(RexUtil::isDeterministic); Review Comment: Can be simplified to `FlinkRexUtil.isDeterministic(calc.getProgram())` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala: ########## @@ -139,6 +141,16 @@ class RelNodeBlock(val outputNode: RelNode) { def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval + def setAllowDuplicateChanges(allowDuplicateChanges: Boolean): Unit = { Review Comment: Based on the logic, it might be more appropriate to call this `mergeXx` instead of a setter. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DuplicateChangesUtils.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.trait.DuplicateChanges; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTrait; +import org.apache.flink.table.planner.plan.trait.DuplicateChangesTraitDef; + +import java.util.Optional; + +/** Utils for {@link DuplicateChanges}. */ +public class DuplicateChangesUtils { + + private DuplicateChangesUtils() {} + + /** + * Get an optional {@link DuplicateChanges} from the given {@link StreamPhysicalRel}. + * + * <p>The {@link DuplicateChanges} is inferred from {@link DuplicateChangesTrait}. + */ + public static Optional<DuplicateChanges> getDuplicateChanges(StreamPhysicalRel rel) { + Optional<DuplicateChangesTrait> duplicateChangesTraitOp = + Optional.ofNullable(rel.getTraitSet().getTrait(DuplicateChangesTraitDef.INSTANCE)); + return duplicateChangesTraitOp.stream() + .map(DuplicateChangesTrait::getDuplicateChanges) + .findFirst(); + } + + /** + * Merge the given two {@link DuplicateChanges} as a new one. + * + * <p>The logic matrix is following: + * + * <pre> + * +-------------+-------------+---------------+ + * | origin_1 | origin_2 | merge result | + * +-------------+-------------+---------------+ + * | NONE | `ANY` | `ANY` | Review Comment: What does`ANY` stand for? Didn't see related logic in the method. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala: ########## @@ -105,6 +105,8 @@ class RelNodeBlock(val outputNode: RelNode) { private var miniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE + private var allowDuplicateChanges: Boolean = true Review Comment: Using `false` as the default value is more intuitive -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org