lincoln-lil commented on code in PR #20393: URL: https://github.com/apache/flink/pull/20393#discussion_r936658329
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java: ########## @@ -0,0 +1,1009 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.connectors.DynamicSourceUtils; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; +import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec; +import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLimit; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSort; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSortLimit; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalSort; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregateBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowDeduplicate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowRank; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.JoinUtil; +import org.apache.flink.table.planner.plan.utils.OverAggregateUtil; +import org.apache.flink.table.planner.plan.utils.RankProcessStrategy; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An inner visitor to validate if there's any NDU problems which may cause wrong result and try to + * rewrite lookup join node with materialization (to eliminate the non determinism generated by + * lookup join node only). + * + * <p>The visitor will try to satisfy the required determinism(represent by ImmutableBitSet) from + * root. The transmission rule of required determinism: + * + * <p>0. all required determinism is under the precondition: input has updates, that is say no + * update determinism will be passed to an insert only stream + * + * <p>1. the initial required determinism to the root node(e.g., sink node) was none + * + * <p>2. for a relNode, it will process on two aspects: - can satisfy non-empty required determinism + * - actively requires determinism from input by self requirements(e.g., stateful node works on + * retract by row mode) + * + * <pre>{@code + * Rel3 + * | require input + * v + * Rel2 {1. satisfy Rel3's requirement 2. append new requirement to input Rel1} + * | require input + * v + * Rel1 + * }</pre> + * + * <p>the requiredDeterminism passed to input will exclude columns which were in upsertKey, e.g., + * + * <pre>{@code + * Sink {pk=(c3)} requiredDeterminism=(c3) + * | passed requiredDeterminism={} + * GroupAgg{group by c3, day} append requiredDeterminism=(c3, day) + * | passed requiredDeterminism=(c3, day) + * Project{select c1,c2,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') day,...} [x] can not satisfy + * | + * Deduplicate{keep last row, dedup on c1,c2} + * | + * Scan + * }</pre> + * + * <p>3. for a sink node, it will require key columns' determinism when primary key is defined or + * require all columns' determinism when no primary key is defined + * + * <p>4. for a cdc source node(which will generate updates), the metadata columns are treated as + * non-deterministic. + */ +public class StreamNonDeterministicUpdatePlanVisitor { + public static final ImmutableBitSet NO_REQUIRED_DETERMINISM = ImmutableBitSet.of(); + + public static final String NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE = + "There exists non deterministic function: '%s' in condition: '%s' which may cause wrong result in update pipeline."; + + public StreamPhysicalRel visit( + final StreamPhysicalRel rel, final ImmutableBitSet requireDeterminism) { + if (rel instanceof StreamPhysicalSink) { + if (inputInsertOnly(rel)) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); + } else { + // for update streaming, when + // 1. sink with pk: + // upsert sink, update by pk, ideally pk == input.upsertKey, + // (otherwise upsertMaterialize will handle it) + + // 1.1 input.upsertKey nonEmpty -> not care about NDU + // 1.2 input.upsertKey isEmpty -> retract by complete row, must not contain NDU + + // once sink's requirement on pk was satisfied, no further request will be transited + // only when new requirement generated at stateful node which input has update + // (e.g., grouping keys) + + // 2. sink without pk: + // retract sink, retract by complete row (all input columns should be deterministic) + // whether input.upsertKey is empty or not, must not contain NDU + StreamPhysicalSink sink = (StreamPhysicalSink) rel; + int[] primaryKey = + sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes(); + ImmutableBitSet requireInputDeterminism; + if (sink.upsertMaterialize() || null == primaryKey || primaryKey.length == 0) { + // SinkUpsertMaterializer only support no upsertKey mode, it says all input + // columns should be deterministic (same as no primary key defined on sink) + // TODO should optimize it after SinkUpsertMaterializer support upsertKey + // FLINK-28569. + requireInputDeterminism = + ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount()); + } else { + requireInputDeterminism = ImmutableBitSet.of(primaryKey); + } + return transmitDeterminismRequirement(sink, requireInputDeterminism); + } + } else if (rel instanceof StreamPhysicalLegacySink<?>) { + if (inputInsertOnly(rel)) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); + } else { + StreamPhysicalLegacySink sink = (StreamPhysicalLegacySink) rel; + TableSchema tableSchema = sink.sink().getTableSchema(); + Optional<UniqueConstraint> primaryKey = tableSchema.getPrimaryKey(); + List<String> columns = Arrays.asList(tableSchema.getFieldNames()); + // SinkUpsertMaterializer does not support legacy sink + ImmutableBitSet requireInputDeterminism; + if (primaryKey.isPresent()) { + requireInputDeterminism = + ImmutableBitSet.of( + primaryKey.get().getColumns().stream() + .map(col -> columns.indexOf(col)) + .collect(Collectors.toList())); + } else { + requireInputDeterminism = ImmutableBitSet.range(columns.size()); + } + return transmitDeterminismRequirement(rel, requireInputDeterminism); + } + } else if (rel instanceof StreamPhysicalCalcBase) { + if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); + } else { + // if input has updates, any non-deterministic conditions are not acceptable, also + // requireDeterminism should be satisfied. + StreamPhysicalCalcBase calc = (StreamPhysicalCalcBase) rel; + checkNonDeterministicRexProgram(requireDeterminism, calc.getProgram(), calc); + + // evaluate required determinism from input + List<RexNode> projects = + calc.getProgram().getProjectList().stream() + .map(expr -> calc.getProgram().expandLocalRef(expr)) + .collect(Collectors.toList()); + Map<Integer, Integer> outFromSourcePos = extractSourceMapping(projects); + List<Integer> conv2Inputs = + requireDeterminism.toList().stream() + .map( + out -> + Optional.ofNullable(outFromSourcePos.get(out)) + .orElseThrow( + () -> + new TableException( + String.format( + "Invalid pos:%d over projection:%s", + out, + calc + .getProgram())))) + .filter(index -> index != -1) + .collect(Collectors.toList()); + + return transmitDeterminismRequirement(calc, ImmutableBitSet.of(conv2Inputs)); + } + } else if (rel instanceof StreamPhysicalCorrelateBase) { + if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { + return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); + } else { + // check if non-deterministic condition (may exist after FLINK-7865 was fixed). + StreamPhysicalCorrelateBase correlate = (StreamPhysicalCorrelateBase) rel; + if (correlate.condition().isDefined()) { + RexNode rexNode = correlate.condition().get(); + checkNonDeterministicCondition(rexNode, correlate); + } + // check if it is a non-deterministic function + int leftFieldCnt = correlate.inputRel().getRowType().getFieldCount(); + Optional<String> ndCall = getNonDeterministicCallName(correlate.scan().getCall()); + if (ndCall.isPresent()) { + // all columns from table function scan cannot satisfy the required determinism + List<Integer> unsatisfiedColumns = + requireDeterminism.toList().stream() + .filter(index -> index >= leftFieldCnt) + .collect(Collectors.toList()); + if (!unsatisfiedColumns.isEmpty()) { + String errorMsg = + generateNonDeterministicColumnsErrorMessage( + unsatisfiedColumns, + correlate.getRowType(), + correlate, + null, + ndCall); + throw new TableException(errorMsg); + } + } + // evaluate required determinism from input + List<Integer> fromLeft = + requireDeterminism.toList().stream() + .filter(index -> index < leftFieldCnt) + .collect(Collectors.toList()); + if (fromLeft.isEmpty()) { + return transmitDeterminismRequirement(correlate, NO_REQUIRED_DETERMINISM); + } + return transmitDeterminismRequirement(correlate, ImmutableBitSet.of(fromLeft)); + } + + } else if (rel instanceof StreamPhysicalLookupJoin) { + if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { + return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); + } else { + /** + * if input has updates, the lookup join may produce non-deterministic result itself + * due to backed lookup source which data may change over time, we can try to + * eliminate this non-determinism by adding materialization to the join operator, + * but still exists non-determinism we cannot solve: 1. join condition 2. the inner + * calc in lookJoin. + */ + StreamPhysicalLookupJoin lookupJoin = (StreamPhysicalLookupJoin) rel; + + // required determinism cannot be satisfied even upsert materialize was enabled if: + // 1. remaining join condition contains non-deterministic call + if (lookupJoin.remainingCondition().isDefined()) { + RexNode rexNode = lookupJoin.remainingCondition().get(); + checkNonDeterministicCondition(rexNode, lookupJoin); + } + // 2. inner calc in lookJoin contains either non-deterministic condition or calls + if (lookupJoin.calcOnTemporalTable().isDefined()) { + checkNonDeterministicRexProgram( + requireDeterminism, lookupJoin.calcOnTemporalTable().get(), lookupJoin); + } + + // Try to resolve non-determinism by adding materialization which can eliminate + // non-determinism produced by lookup join via an evolving source. + int leftFieldCnt = lookupJoin.getInput().getRowType().getFieldCount(); + List<Integer> requireRight = + requireDeterminism.toList().stream() + .filter(index -> index >= leftFieldCnt) + .collect(Collectors.toList()); + boolean omitUpsertMaterialize = false; + // two optimizations: 1. no fields from lookup source was required 2. lookup key + // contains pk and no requirement on other fields we can omit materialization, + // otherwise upsert materialize can not be omitted. + if (requireRight.isEmpty()) { + omitUpsertMaterialize = true; + } else { + int[] outputPkIdx = lookupJoin.getOutputPrimaryKeyIndexes(); + ImmutableBitSet outputPkBitSet = ImmutableBitSet.of(outputPkIdx); + // outputPkIdx need to used so not using #lookupKeyContainsPrimaryKey directly. + omitUpsertMaterialize = + null != outputPkIdx + && outputPkIdx.length > 0 + && Arrays.stream(outputPkIdx) + .allMatch( + index -> + lookupJoin + .allLookupKeys() + .contains(index)) + && requireRight.stream() + .allMatch(index -> outputPkBitSet.get(index)); + } + List<Integer> requireLeft = + requireDeterminism.toList().stream() + .filter(index -> index < leftFieldCnt) + .collect(Collectors.toList()); + + if (omitUpsertMaterialize) { + return transmitDeterminismRequirement( + lookupJoin, ImmutableBitSet.of(requireLeft)); + } else { + // enable materialize for lookup join + return transmitDeterminismRequirement( + lookupJoin.copy(true), ImmutableBitSet.of(requireLeft)); + } + } + } else if (rel instanceof StreamPhysicalTableSourceScan) { + // tableScan has no input, so only check meta data from cdc source + if (!requireDeterminism.isEmpty()) { + StreamPhysicalTableSourceScan tableScan = (StreamPhysicalTableSourceScan) rel; + boolean insertOnly = + tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT); + boolean supportsReadingMetadata = + tableScan.tableSource() instanceof SupportsReadingMetadata; + if (!insertOnly && supportsReadingMetadata) { + TableSourceTable sourceTable = + tableScan.getTable().unwrap(TableSourceTable.class); + // check if requireDeterminism contains metadata column + List<Column.MetadataColumn> metadataColumns = + DynamicSourceUtils.extractMetadataColumns( + sourceTable.contextResolvedTable().getResolvedSchema()); + Set<String> metaColumnSet = + metadataColumns.stream() + .map(col -> col.getName()) + .collect(Collectors.toSet()); + List<String> columns = tableScan.getRowType().getFieldNames(); + List<String> metadataCauseErr = new ArrayList<>(); + for (int index = 0; index < columns.size(); index++) { + String column = columns.get(index); + if (metaColumnSet.contains(column) && requireDeterminism.get(index)) { + metadataCauseErr.add(column); + } + } + if (!metadataCauseErr.isEmpty()) { + StringBuilder errorMsg = new StringBuilder(); + errorMsg.append("The metadata column(s): '") + .append(String.join(", ", metadataCauseErr.toArray(new String[0]))) + .append("' in cdc source may cause wrong result or error on") + .append(" downstream operators, please consider removing these") + .append(" columns or use a non-cdc source that only has insert") + .append(" messages.\nsource node:\n") + .append( + FlinkRelOptUtil.toString( + tableScan, + SqlExplainLevel.DIGEST_ATTRIBUTES, + false, + true, + false, + true)); + throw new TableException(errorMsg.toString()); + } + } + } + return rel; + } else if (rel instanceof StreamPhysicalLegacyTableSourceScan + || rel instanceof StreamPhysicalDataStreamScan + || rel instanceof StreamPhysicalValues) { + // not cdc source, end visit + return rel; + } else if (rel instanceof StreamPhysicalGroupAggregateBase) { + // output row type = grouping keys + aggCalls + StreamPhysicalGroupAggregateBase groupAgg = (StreamPhysicalGroupAggregateBase) rel; + if (inputInsertOnly(groupAgg)) { Review Comment: sigh... there're different logic for groupAgg, overAgg, windowAgg, regular join and source node -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org