lincoln-lil commented on code in PR #20393:
URL: https://github.com/apache/flink/pull/20393#discussion_r936658329


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java:
##########
@@ -0,0 +1,1009 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.Column;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLimit;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSort;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSortLimit;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalSort;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowRank;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
+import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An inner visitor to validate if there's any NDU problems which may cause 
wrong result and try to
+ * rewrite lookup join node with materialization (to eliminate the non 
determinism generated by
+ * lookup join node only).
+ *
+ * <p>The visitor will try to satisfy the required determinism(represent by 
ImmutableBitSet) from
+ * root. The transmission rule of required determinism:
+ *
+ * <p>0. all required determinism is under the precondition: input has 
updates, that is say no
+ * update determinism will be passed to an insert only stream
+ *
+ * <p>1. the initial required determinism to the root node(e.g., sink node) 
was none
+ *
+ * <p>2. for a relNode, it will process on two aspects: - can satisfy 
non-empty required determinism
+ * - actively requires determinism from input by self requirements(e.g., 
stateful node works on
+ * retract by row mode)
+ *
+ * <pre>{@code
+ * Rel3
+ *  | require input
+ *  v
+ * Rel2 {1. satisfy Rel3's requirement 2. append new requirement to input Rel1}
+ *  | require input
+ *  v
+ * Rel1
+ * }</pre>
+ *
+ * <p>the requiredDeterminism passed to input will exclude columns which were 
in upsertKey, e.g.,
+ *
+ * <pre>{@code
+ * Sink {pk=(c3)} requiredDeterminism=(c3)
+ *   | passed requiredDeterminism={}
+ *  GroupAgg{group by c3, day} append requiredDeterminism=(c3, day)
+ *   | passed requiredDeterminism=(c3, day)
+ * Project{select c1,c2,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') day,...} [x] 
can not satisfy
+ *   |
+ * Deduplicate{keep last row, dedup on c1,c2}
+ *   |
+ *  Scan
+ * }</pre>
+ *
+ * <p>3. for a sink node, it will require key columns' determinism when 
primary key is defined or
+ * require all columns' determinism when no primary key is defined
+ *
+ * <p>4. for a cdc source node(which will generate updates), the metadata 
columns are treated as
+ * non-deterministic.
+ */
+public class StreamNonDeterministicUpdatePlanVisitor {
+    public static final ImmutableBitSet NO_REQUIRED_DETERMINISM = 
ImmutableBitSet.of();
+
+    public static final String NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE =
+            "There exists non deterministic function: '%s' in condition: '%s' 
which may cause wrong result in update pipeline.";
+
+    public StreamPhysicalRel visit(
+            final StreamPhysicalRel rel, final ImmutableBitSet 
requireDeterminism) {
+        if (rel instanceof StreamPhysicalSink) {
+            if (inputInsertOnly(rel)) {
+                // for append stream, not care about NDU
+                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
+            } else {
+                // for update streaming, when
+                // 1. sink with pk:
+                // upsert sink, update by pk, ideally pk == input.upsertKey,
+                // (otherwise upsertMaterialize will handle it)
+
+                // 1.1 input.upsertKey nonEmpty -> not care about NDU
+                // 1.2 input.upsertKey isEmpty -> retract by complete row, 
must not contain NDU
+
+                // once sink's requirement on pk was satisfied, no further 
request will be transited
+                // only when new requirement generated at stateful node which 
input has update
+                // (e.g., grouping keys)
+
+                // 2. sink without pk:
+                // retract sink, retract by complete row (all input columns 
should be deterministic)
+                // whether input.upsertKey is empty or not, must not contain 
NDU
+                StreamPhysicalSink sink = (StreamPhysicalSink) rel;
+                int[] primaryKey =
+                        
sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes();
+                ImmutableBitSet requireInputDeterminism;
+                if (sink.upsertMaterialize() || null == primaryKey || 
primaryKey.length == 0) {
+                    // SinkUpsertMaterializer only support no upsertKey mode, 
it says all input
+                    // columns should be deterministic (same as no primary key 
defined on sink)
+                    // TODO should optimize it after SinkUpsertMaterializer 
support upsertKey
+                    // FLINK-28569.
+                    requireInputDeterminism =
+                            
ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount());
+                } else {
+                    requireInputDeterminism = ImmutableBitSet.of(primaryKey);
+                }
+                return transmitDeterminismRequirement(sink, 
requireInputDeterminism);
+            }
+        } else if (rel instanceof StreamPhysicalLegacySink<?>) {
+            if (inputInsertOnly(rel)) {
+                // for append stream, not care about NDU
+                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
+            } else {
+                StreamPhysicalLegacySink sink = (StreamPhysicalLegacySink) rel;
+                TableSchema tableSchema = sink.sink().getTableSchema();
+                Optional<UniqueConstraint> primaryKey = 
tableSchema.getPrimaryKey();
+                List<String> columns = 
Arrays.asList(tableSchema.getFieldNames());
+                // SinkUpsertMaterializer does not support legacy sink
+                ImmutableBitSet requireInputDeterminism;
+                if (primaryKey.isPresent()) {
+                    requireInputDeterminism =
+                            ImmutableBitSet.of(
+                                    primaryKey.get().getColumns().stream()
+                                            .map(col -> columns.indexOf(col))
+                                            .collect(Collectors.toList()));
+                } else {
+                    requireInputDeterminism = 
ImmutableBitSet.range(columns.size());
+                }
+                return transmitDeterminismRequirement(rel, 
requireInputDeterminism);
+            }
+        } else if (rel instanceof StreamPhysicalCalcBase) {
+            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
+                // for append stream, not care about NDU
+                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
+            } else {
+                // if input has updates, any non-deterministic conditions are 
not acceptable, also
+                // requireDeterminism should be satisfied.
+                StreamPhysicalCalcBase calc = (StreamPhysicalCalcBase) rel;
+                checkNonDeterministicRexProgram(requireDeterminism, 
calc.getProgram(), calc);
+
+                // evaluate required determinism from input
+                List<RexNode> projects =
+                        calc.getProgram().getProjectList().stream()
+                                .map(expr -> 
calc.getProgram().expandLocalRef(expr))
+                                .collect(Collectors.toList());
+                Map<Integer, Integer> outFromSourcePos = 
extractSourceMapping(projects);
+                List<Integer> conv2Inputs =
+                        requireDeterminism.toList().stream()
+                                .map(
+                                        out ->
+                                                
Optional.ofNullable(outFromSourcePos.get(out))
+                                                        .orElseThrow(
+                                                                () ->
+                                                                        new 
TableException(
+                                                                               
 String.format(
+                                                                               
         "Invalid pos:%d over projection:%s",
+                                                                               
         out,
+                                                                               
         calc
+                                                                               
                 .getProgram()))))
+                                .filter(index -> index != -1)
+                                .collect(Collectors.toList());
+
+                return transmitDeterminismRequirement(calc, 
ImmutableBitSet.of(conv2Inputs));
+            }
+        } else if (rel instanceof StreamPhysicalCorrelateBase) {
+            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
+                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
+            } else {
+                // check if non-deterministic condition (may exist after 
FLINK-7865 was fixed).
+                StreamPhysicalCorrelateBase correlate = 
(StreamPhysicalCorrelateBase) rel;
+                if (correlate.condition().isDefined()) {
+                    RexNode rexNode = correlate.condition().get();
+                    checkNonDeterministicCondition(rexNode, correlate);
+                }
+                // check if it is a non-deterministic function
+                int leftFieldCnt = 
correlate.inputRel().getRowType().getFieldCount();
+                Optional<String> ndCall = 
getNonDeterministicCallName(correlate.scan().getCall());
+                if (ndCall.isPresent()) {
+                    // all columns from table function scan cannot satisfy the 
required determinism
+                    List<Integer> unsatisfiedColumns =
+                            requireDeterminism.toList().stream()
+                                    .filter(index -> index >= leftFieldCnt)
+                                    .collect(Collectors.toList());
+                    if (!unsatisfiedColumns.isEmpty()) {
+                        String errorMsg =
+                                generateNonDeterministicColumnsErrorMessage(
+                                        unsatisfiedColumns,
+                                        correlate.getRowType(),
+                                        correlate,
+                                        null,
+                                        ndCall);
+                        throw new TableException(errorMsg);
+                    }
+                }
+                // evaluate required determinism from input
+                List<Integer> fromLeft =
+                        requireDeterminism.toList().stream()
+                                .filter(index -> index < leftFieldCnt)
+                                .collect(Collectors.toList());
+                if (fromLeft.isEmpty()) {
+                    return transmitDeterminismRequirement(correlate, 
NO_REQUIRED_DETERMINISM);
+                }
+                return transmitDeterminismRequirement(correlate, 
ImmutableBitSet.of(fromLeft));
+            }
+
+        } else if (rel instanceof StreamPhysicalLookupJoin) {
+            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
+                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
+            } else {
+                /**
+                 * if input has updates, the lookup join may produce 
non-deterministic result itself
+                 * due to backed lookup source which data may change over 
time, we can try to
+                 * eliminate this non-determinism by adding materialization to 
the join operator,
+                 * but still exists non-determinism we cannot solve: 1. join 
condition 2. the inner
+                 * calc in lookJoin.
+                 */
+                StreamPhysicalLookupJoin lookupJoin = 
(StreamPhysicalLookupJoin) rel;
+
+                // required determinism cannot be satisfied even upsert 
materialize was enabled if:
+                // 1. remaining join condition contains non-deterministic call
+                if (lookupJoin.remainingCondition().isDefined()) {
+                    RexNode rexNode = lookupJoin.remainingCondition().get();
+                    checkNonDeterministicCondition(rexNode, lookupJoin);
+                }
+                // 2. inner calc in lookJoin contains either non-deterministic 
condition or calls
+                if (lookupJoin.calcOnTemporalTable().isDefined()) {
+                    checkNonDeterministicRexProgram(
+                            requireDeterminism, 
lookupJoin.calcOnTemporalTable().get(), lookupJoin);
+                }
+
+                // Try to resolve non-determinism by adding materialization 
which can eliminate
+                // non-determinism produced by lookup join via an evolving 
source.
+                int leftFieldCnt = 
lookupJoin.getInput().getRowType().getFieldCount();
+                List<Integer> requireRight =
+                        requireDeterminism.toList().stream()
+                                .filter(index -> index >= leftFieldCnt)
+                                .collect(Collectors.toList());
+                boolean omitUpsertMaterialize = false;
+                // two optimizations: 1. no fields from lookup source was 
required 2. lookup key
+                // contains pk and no requirement on other fields we can omit 
materialization,
+                // otherwise upsert materialize can not be omitted.
+                if (requireRight.isEmpty()) {
+                    omitUpsertMaterialize = true;
+                } else {
+                    int[] outputPkIdx = 
lookupJoin.getOutputPrimaryKeyIndexes();
+                    ImmutableBitSet outputPkBitSet = 
ImmutableBitSet.of(outputPkIdx);
+                    // outputPkIdx need to used so not using 
#lookupKeyContainsPrimaryKey directly.
+                    omitUpsertMaterialize =
+                            null != outputPkIdx
+                                    && outputPkIdx.length > 0
+                                    && Arrays.stream(outputPkIdx)
+                                            .allMatch(
+                                                    index ->
+                                                            lookupJoin
+                                                                    
.allLookupKeys()
+                                                                    
.contains(index))
+                                    && requireRight.stream()
+                                            .allMatch(index -> 
outputPkBitSet.get(index));
+                }
+                List<Integer> requireLeft =
+                        requireDeterminism.toList().stream()
+                                .filter(index -> index < leftFieldCnt)
+                                .collect(Collectors.toList());
+
+                if (omitUpsertMaterialize) {
+                    return transmitDeterminismRequirement(
+                            lookupJoin, ImmutableBitSet.of(requireLeft));
+                } else {
+                    // enable materialize for lookup join
+                    return transmitDeterminismRequirement(
+                            lookupJoin.copy(true), 
ImmutableBitSet.of(requireLeft));
+                }
+            }
+        } else if (rel instanceof StreamPhysicalTableSourceScan) {
+            // tableScan has no input, so only check meta data from cdc source
+            if (!requireDeterminism.isEmpty()) {
+                StreamPhysicalTableSourceScan tableScan = 
(StreamPhysicalTableSourceScan) rel;
+                boolean insertOnly =
+                        
tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT);
+                boolean supportsReadingMetadata =
+                        tableScan.tableSource() instanceof 
SupportsReadingMetadata;
+                if (!insertOnly && supportsReadingMetadata) {
+                    TableSourceTable sourceTable =
+                            
tableScan.getTable().unwrap(TableSourceTable.class);
+                    // check if requireDeterminism contains metadata column
+                    List<Column.MetadataColumn> metadataColumns =
+                            DynamicSourceUtils.extractMetadataColumns(
+                                    
sourceTable.contextResolvedTable().getResolvedSchema());
+                    Set<String> metaColumnSet =
+                            metadataColumns.stream()
+                                    .map(col -> col.getName())
+                                    .collect(Collectors.toSet());
+                    List<String> columns = 
tableScan.getRowType().getFieldNames();
+                    List<String> metadataCauseErr = new ArrayList<>();
+                    for (int index = 0; index < columns.size(); index++) {
+                        String column = columns.get(index);
+                        if (metaColumnSet.contains(column) && 
requireDeterminism.get(index)) {
+                            metadataCauseErr.add(column);
+                        }
+                    }
+                    if (!metadataCauseErr.isEmpty()) {
+                        StringBuilder errorMsg = new StringBuilder();
+                        errorMsg.append("The metadata column(s): '")
+                                .append(String.join(", ", 
metadataCauseErr.toArray(new String[0])))
+                                .append("' in cdc source may cause wrong 
result or error on")
+                                .append(" downstream operators, please 
consider removing these")
+                                .append(" columns or use a non-cdc source that 
only has insert")
+                                .append(" messages.\nsource node:\n")
+                                .append(
+                                        FlinkRelOptUtil.toString(
+                                                tableScan,
+                                                
SqlExplainLevel.DIGEST_ATTRIBUTES,
+                                                false,
+                                                true,
+                                                false,
+                                                true));
+                        throw new TableException(errorMsg.toString());
+                    }
+                }
+            }
+            return rel;
+        } else if (rel instanceof StreamPhysicalLegacyTableSourceScan
+                || rel instanceof StreamPhysicalDataStreamScan
+                || rel instanceof StreamPhysicalValues) {
+            // not cdc source, end visit
+            return rel;
+        } else if (rel instanceof StreamPhysicalGroupAggregateBase) {
+            // output row type = grouping keys + aggCalls
+            StreamPhysicalGroupAggregateBase groupAgg = 
(StreamPhysicalGroupAggregateBase) rel;
+            if (inputInsertOnly(groupAgg)) {

Review Comment:
   sigh... there're different logic for groupAgg, overAgg, windowAgg, regular 
join and source node



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to