wuchong commented on a change in pull request #11674: [FLINK-16887][table-planner-blink] Refactor retraction rules to support inferring ChangelogMode URL: https://github.com/apache/flink/pull/11674#discussion_r407320537
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala ########## @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize.program + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNoUpdate, onlyAfterOrNoUpdate} +import org.apache.flink.table.planner.plan.`trait`._ +import org.apache.flink.table.planner.plan.nodes.physical.stream._ +import org.apache.flink.table.planner.plan.utils._ +import org.apache.flink.table.planner.sinks.DataStreamTableSink +import org.apache.flink.table.runtime.operators.join.FlinkJoinType +import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink} + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.util.ImmutableBitSet + +import scala.collection.JavaConversions._ + +/** + * An optimize program to infer ChangelogMode for every physical nodes. + */ +class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOptimizeContext] { + + private val SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR = new SatisfyModifyKindSetTraitVisitor + private val SATISFY_UPDATE_KIND_TRAIT_VISITOR = new SatisfyUpdateKindTraitVisitor + + override def optimize( + root: RelNode, + context: StreamOptimizeContext): RelNode = { + + // step1: satisfy ModifyKindSet trait + val physicalRoot = root.asInstanceOf[StreamPhysicalRel] + val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit( + physicalRoot, + // we do not propagate the ModifyKindSet requirement and requester among blocks + // set default ModifyKindSet requirement and requester for root + ModifyKindSetTrait.ALL_CHANGES, + "ROOT") + + // step2: satisfy UpdateKind trait + val rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet) + // use the required UpdateKindTrait from parent blocks + val requiredUpdateKindTrait = if (context.isUpdateBeforeRequired) { + UpdateKindTrait.BEFORE_AND_AFTER + } else if (rootModifyKindSet.isInsertOnly) { + UpdateKindTrait.NO_UPDATE + } else { + UpdateKindTrait.ONLY_UPDATE_AFTER + } + val finalRoot = SATISFY_UPDATE_KIND_TRAIT_VISITOR.visit( + rootWithModifyKindSet, + requiredUpdateKindTrait) + + // step3: sanity check and return non-empty root + if (finalRoot.isEmpty) { + val plan = FlinkRelOptUtil.toString(root, withChangelogTraits = true) + throw new TableException( + "Can't generate a valid execution plan for the given query:\n" + plan) + } else { + finalRoot.get + } + } + + + /** + * A visitor which will try to satisfy the required [[ModifyKindSetTrait]] from root. + * + * <p>After traversed by this visitor, every node should have a correct [[ModifyKindSetTrait]] + * or an exception should be thrown if the planner doesn't support to satisfy the required + * [[ModifyKindSetTrait]]. + */ + private class SatisfyModifyKindSetTraitVisitor { + + /** + * Try to satisfy the required [[ModifyKindSetTrait]] from root. + * + * <p>Each node should first require a [[ModifyKindSetTrait]] to its children. + * If the trait provided by children does not satisfy the required one, + * it should throw an exception and prompt the user that plan is not supported. + * The required [[ModifyKindSetTrait]] may come from the node's parent, + * or come from the node itself, depending on whether the node will destroy + * the trait provided by children or pass the trait from children. + * + * <p>Each node should provide [[ModifyKindSetTrait]] according to current node behavior + * and the ModifyKindSetTrait provided by children. + * + * @param rel the node who should satisfy the requiredTrait + * @param requiredTrait the required ModifyKindSetTrait + * @param requester the requester who starts the requirement, used for better exception message + * @return A converted node which satisfy required traits by inputs node of current node. + * Or throws exception if required trait can’t be satisfied. + */ + def visit( + rel: StreamPhysicalRel, + requiredTrait: ModifyKindSetTrait, + requester: String): StreamPhysicalRel = rel match { + case sink: StreamExecSink[_] => + val (sinkRequiredTrait, name) = sink.sink match { + case _: UpsertStreamTableSink[_] => + (ModifyKindSetTrait.ALL_CHANGES, "UpsertStreamTableSink") + case _: RetractStreamTableSink[_] => + (ModifyKindSetTrait.ALL_CHANGES, "RetractStreamTableSink") + case _: AppendStreamTableSink[_] => + (ModifyKindSetTrait.INSERT_ONLY, "AppendStreamTableSink") + case _: StreamTableSink[_] => + (ModifyKindSetTrait.INSERT_ONLY, "StreamTableSink") + case ds: DataStreamTableSink[_] => + if (ds.withChangeFlag) { + (ModifyKindSetTrait.ALL_CHANGES, "toRetractStream") + } else { + (ModifyKindSetTrait.INSERT_ONLY, "toAppendStream") + } + case _ => + throw new UnsupportedOperationException( + s"Unsupported sink '${sink.sink.getClass.getSimpleName}'") + } + val children = visitChildren(sink, sinkRequiredTrait, name) + val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY) + // ignore required trait from context, because sink is the true root + sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel] + + case deduplicate: StreamExecDeduplicate => + // deduplicate only support insert only as input + val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY) + val providedTrait = if (deduplicate.keepLastRow) { + // produce updates if it keeps last row + ModifyKindSetTrait.ALL_CHANGES + } else { + ModifyKindSetTrait.INSERT_ONLY + } + replaceChildrenAndTrait(deduplicate, children, providedTrait, requiredTrait, requester) + + case agg: StreamExecGroupAggregate => + // agg support all changes in input + val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES) + val inputModifyKindSet = getModifyKindSet(children.head) + val builder = ModifyKindSet.newBuilder() + .addContainedKind(ModifyKind.INSERT) + .addContainedKind(ModifyKind.UPDATE) + if (inputModifyKindSet.contains(ModifyKind.UPDATE) || + inputModifyKindSet.contains(ModifyKind.DELETE)) { + builder.addContainedKind(ModifyKind.DELETE) + } + val providedTrait = new ModifyKindSetTrait(builder.build()) + replaceChildrenAndTrait(agg, children, providedTrait, requiredTrait, requester) + + case tagg: StreamExecGroupTableAggregate => + // table agg support all changes in input + val children = visitChildren(tagg, ModifyKindSetTrait.ALL_CHANGES) + // table aggregate will produce all changes, including deletions + replaceChildrenAndTrait( + tagg, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester) + + case window: StreamExecGroupWindowAggregateBase => + // WindowAggregate and WindowTableAggregate support insert-only in input + val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY) + val builder = ModifyKindSet.newBuilder() + .addContainedKind(ModifyKind.INSERT) + if (window.emitStrategy.produceUpdates) { + builder.addContainedKind(ModifyKind.UPDATE) + } + val providedTrait = new ModifyKindSetTrait(builder.build()) + replaceChildrenAndTrait(window, children, providedTrait, requiredTrait, requester) + + case limit: StreamExecLimit => + // limit support all changes in input + val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES) + val providedTrait = if (getModifyKindSet(children.head).isInsertOnly) { + ModifyKindSetTrait.INSERT_ONLY + } else { + ModifyKindSetTrait.ALL_CHANGES + } + replaceChildrenAndTrait(limit, children, providedTrait, requiredTrait, requester) + + case _: StreamExecRank | _: StreamExecSortLimit => + // Rank supports consuming all changes + val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES) + replaceChildrenAndTrait( + rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester) + + case sort: StreamExecSort => + // Sort supports consuming all changes + val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES) + // Sort will buffer all inputs, and produce insert-only messages when input is finished + replaceChildrenAndTrait( + sort, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) + + case cep: StreamExecMatch => + // CEP only supports consuming insert-only and producing insert-only changes + // give a better requester name for exception message + val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize") + replaceChildrenAndTrait( + cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) + + case _: StreamExecTemporalSort | _: StreamExecOverAggregate | _: StreamExecWindowJoin => + // TemporalSort, OverAggregate, WindowJoin only support consuming insert-only + // and producing insert-only changes + val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY) + replaceChildrenAndTrait( + rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) + + case join: StreamExecJoin => + // join support all changes in input + val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES) + val leftKindSet = getModifyKindSet(children.head) + val rightKindSet = getModifyKindSet(children.last) + val innerOrSemi = join.flinkJoinType == FlinkJoinType.INNER || + join.flinkJoinType == FlinkJoinType.SEMI + val providedTrait = if (leftKindSet.isInsertOnly && + rightKindSet.isInsertOnly && innerOrSemi) { + // produce insert-only because results are deterministic + ModifyKindSetTrait.INSERT_ONLY + } else { + // otherwise, it may produce any kinds of changes + ModifyKindSetTrait.ALL_CHANGES + } + replaceChildrenAndTrait(join, children, providedTrait, requiredTrait, requester) + + case temporalJoin: StreamExecTemporalJoin => + // currently, temporal join only support insert-only input streams, including right side + val children = visitChildren(temporalJoin, ModifyKindSetTrait.INSERT_ONLY) + // forward left input changes + val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) + replaceChildrenAndTrait(temporalJoin, children, leftTrait, requiredTrait, requester) + + case _: StreamExecCalc | _: StreamExecPythonCalc | _: StreamExecCorrelate | + _: StreamExecPythonCorrelate | _: StreamExecLookupJoin | _: StreamExecExchange | + _: StreamExecExpand | _: StreamExecMiniBatchAssigner | + _: StreamExecWatermarkAssigner => + // transparent forward requiredTrait to children + val children = visitChildren(rel, requiredTrait, requester) + val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) + // forward children mode + replaceChildrenAndTrait(rel, children, childrenTrait, requiredTrait, requester) + + case union: StreamExecUnion => + // transparent forward requiredTrait to children + val children = visitChildren(rel, requiredTrait, requester) + // union provides all possible kinds of children have + val providedKindSet = ModifyKindSet.union(children.map(getModifyKindSet): _*) + replaceChildrenAndTrait( + union, children, new ModifyKindSetTrait(providedKindSet), requiredTrait, requester) + + case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan | _: StreamExecValues => + // DataStream, TableSource and Values only support producing insert-only messages + replaceChildrenAndTrait( + rel, List(), ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) + + case scan: StreamExecIntermediateTableScan => + val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet) + replaceChildrenAndTrait(scan, List(), providedTrait, requiredTrait, requester) + + case _ => + throw new UnsupportedOperationException( + s"Unsupported visit for ${rel.getClass.getSimpleName}") + } + + private def visitChildren( + parent: StreamPhysicalRel, + requiredChildrenTrait: ModifyKindSetTrait): List[StreamPhysicalRel] = { + visitChildren(parent, requiredChildrenTrait, getNodeName(parent)) + } + + private def visitChildren( + parent: StreamPhysicalRel, + requiredChildrenTrait: ModifyKindSetTrait, + requester: String): List[StreamPhysicalRel] = { + val newChildren = for (i <- 0 until parent.getInputs.size()) yield { + visitChild(parent, i, requiredChildrenTrait, requester) + } + newChildren.toList + } + + private def visitChild( + parent: StreamPhysicalRel, + childOrdinal: Int, + requiredChildTrait: ModifyKindSetTrait, + requester: String): StreamPhysicalRel = { + val child = parent.getInput(childOrdinal).asInstanceOf[StreamPhysicalRel] + this.visit(child, requiredChildTrait, requester) + } + + private def getNodeName(rel: StreamPhysicalRel): String = { + val prefix = "StreamExec" + val typeName = rel.getRelTypeName + if (typeName.startsWith(prefix)) { + typeName.substring(prefix.length) + } else { + typeName + } + } + + private def replaceChildrenAndTrait( + node: StreamPhysicalRel, + children: List[StreamPhysicalRel], + providedTrait: ModifyKindSetTrait, + requiredTrait: ModifyKindSetTrait, + requestedOwner: String): StreamPhysicalRel = { + if (!providedTrait.satisfies(requiredTrait)) { + val diff = providedTrait.modifyKindSet.diff(requiredTrait.modifyKindSet) + val diffString = diff.getContainedKinds + .toList.sorted // for deterministic error message + .map(_.toString.toLowerCase) + .mkString(" and ") + // creates a new node based on the new children, to have a more correct node description + // e.g. description of GroupAggregate is based on the ModifyKindSetTrait of children + val tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel] + val nodeString = tempNode.getRelDetailedDescription + throw new TableException( + s"$requestedOwner doesn't support consuming $diffString changes " + + s"which is produced by node $nodeString") + } + val newTraitSet = node.getTraitSet.plus(providedTrait) + node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel] + } + } + + /** + * A visitor which will try to satisfy the required [[UpdateKindTrait]] from root. + * + * <p>After traversed by this visitor, every node should have a correct [[UpdateKindTrait]] + * or returns None if the planner doesn't support to satisfy the required [[UpdateKindTrait]]. + */ + private class SatisfyUpdateKindTraitVisitor { + + /** + * Try to satisfy the required [[UpdateKindTrait]] from root. + * + * <p>Each node will first require a UpdateKindTrait to its children. + * The required UpdateKindTrait may come from the node's parent, + * or come from the node itself, depending on whether the node will destroy + * the trait provided by children or pass the trait from children. + * + * <p>If the node will pass the children's UpdateKindTrait without destroying it, + * then return a new node with new inputs and forwarded UpdateKindTrait. + * + * <p>If the node will destroy the children's UpdateKindTrait, then the node itself + * needs to be converted, or a new node should be generated to satisfy the required trait, + * such as marking itself not to generate UPDATE_BEFORE, + * or generating a new node to filter UPDATE_BEFORE. + * + * @param rel the node who should satisfy the requiredTrait + * @param requiredTrait the required UpdateKindTrait + * @return A converted node which satisfy required traits by inputs node of current node. + * Or None if required traits cannot be satisfied. + */ + def visit( + rel: StreamPhysicalRel, + requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match { + case sink: StreamExecSink[_] => + val childModifyKindSet = getModifyKindSet(sink.getInput) + val sinkRequiredTraits = sink.sink match { + case _: UpsertStreamTableSink[_] => + // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER + Seq(onlyAfterOrNoUpdate(childModifyKindSet), beforeAfterOrNoUpdate(childModifyKindSet)) + case _: RetractStreamTableSink[_] => + Seq(beforeAfterOrNoUpdate(childModifyKindSet)) + case _: AppendStreamTableSink[_] | _: StreamTableSink[_] => + Seq(UpdateKindTrait.NO_UPDATE) + case ds: DataStreamTableSink[_] => + if (ds.withChangeFlag) { + if (ds.needUpdateBefore) { + Seq(beforeAfterOrNoUpdate(childModifyKindSet)) + } else { + // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER + Seq( + onlyAfterOrNoUpdate(childModifyKindSet), + beforeAfterOrNoUpdate(childModifyKindSet)) + } + } else { + Seq(UpdateKindTrait.NO_UPDATE) + } + } + val children = sinkRequiredTraits.flatMap(t => visitChildren(sink, t)) + if (children.isEmpty) { + None + } else { + val sinkTrait = sink.getTraitSet.plus(UpdateKindTrait.NO_UPDATE) + Some(sink.copy(sinkTrait, children.head).asInstanceOf[StreamPhysicalRel]) + } + + case _: StreamExecGroupAggregate | _: StreamExecGroupTableAggregate | + _: StreamExecLimit => + // Aggregate, TableAggregate and Limit requires update_before if there are updates + val requiredChildTrait = beforeAfterOrNoUpdate(getModifyKindSet(rel.getInput(0))) + val children = visitChildren(rel, requiredChildTrait) + // use requiredTrait as providedTrait, because they should support all kinds of UpdateKind + replaceChildrenAndTrait(rel, children, requiredTrait) + + case _: StreamExecGroupWindowAggregate | _: StreamExecGroupWindowTableAggregate | + _: StreamExecDeduplicate | _: StreamExecTemporalSort | _: StreamExecMatch | + _: StreamExecOverAggregate | _: StreamExecWindowJoin => + // WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP, OverAggregate + // and WindowJoin require no updates in input + val children = visitChildren(rel, UpdateKindTrait.NO_UPDATE) + replaceChildrenAndTrait(rel, children, requiredTrait) + + case rank: StreamExecRank => + val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies( + rank.getInput, rank.partitionKey, rank.orderKey) + // creates new Rank nodes for every applied RankStrategy + val newRankNodes = rankStrategies.map(rank.copy) Review comment: I added a `applyRankStrategy: RankProcessStrategy => StreamPhysicalRel` paramter to the method `visitRankStrategies` to make it possible move rank.copy into this method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services