[ https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966535#comment-15966535 ]
ASF GitHub Bot commented on FLINK-6090: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3696#discussion_r110393126 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.RelNode + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +/** + * Collection of retraction rules that apply various transformations on DataStreamRel trees. + * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule and + * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule -> + * NeedToRetractProcessRule -> AccModeProcessRule). + */ +object DataStreamRetractionRule { + + /** + * Singleton rule that init retraction trait inside a [[DataStreamRel]] + */ + val INIT_INSTANCE = new InitProcessRule() + + /** + * Singleton rule that decide needToRetract property inside a [[DataStreamRel]] + */ + val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule() + + /** + * Singleton rule that decide accMode inside a [[DataStreamRel]] + */ + val ACCMODE_INSTANCE = new AccModeProcessRule() + + /** + * Get all child RelNodes of a RelNode + * @param topRel The input RelNode + * @return All child nodes + */ + def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = { + val topRelInputs = new ListBuffer[RelNode]() + topRelInputs.++=(topRel.getInputs.asScala) + topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel) + } + + def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = { + val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE) + if (null == retractionTrait) { + false + } else { + retractionTrait.getNeedToRetract + } + } + + + /** + * Find all needToRetract nodes. A node needs to retract means that there are downstream + * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and + * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides, a + * needToRetract node also need retraction from it's upstream nodes. + */ + class NeedToRetractProcessRule extends RelOptRule( + operand( + classOf[DataStreamRel], none()), + "NeedToRetractProcessRule") { + + /** + * Return true if bottom RelNode does not contain needToRetract and top RelNode need + * retraction from bottom RelNode. Currently, operators which contain aggregations need + * retraction from upstream nodes, besides, a needToRetract node also needs retraction from + * it's upstream nodes. + */ + def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = { + val bottomTraits = bottomRel.getTraitSet + if(!traitSetContainNeedToRetract(bottomTraits)){ + topRel match { + case _: DataStreamGroupAggregate => true + case _: DataStreamGroupWindowAggregate => true + case _: DataStreamOverAggregate => true + case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true + case _ => false + } + } else { + false + } + } + + /** + * Add needToRetract for the input RelNode + */ + def addNeedToRetract(relNode: RelNode): RelNode = { + val traitSet = relNode.getTraitSet + var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE) + if (null == retractionTrait) { + retractionTrait = new RetractionTrait(true, AccMode.Acc) + } else { + retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode) + } + + relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs) + } + + /** + * Returns a new topRel and a needTransform flag for a given topRel and bottomRels. The new + * topRel contains new bottomRels with needToRetract properly marked. The needTransform flag + * will be true if any transformation has been done. + * + * @param topRel The input top RelNode. + * @param bottomRels The input bottom RelNodes. + * @return A tuple holding a new top RelNode and a needTransform flag + */ + def needToRetractProcess( + topRel: RelNode, + bottomRels: ListBuffer[RelNode]) + : (RelNode, Boolean) = { + + var needTransform = false + var i = 0 + while(i < bottomRels.size) { + val bottomRel = bottomRels(i) + if(bottomNeedToRetract(topRel, bottomRel)) { + needTransform = true + bottomRels(i) = addNeedToRetract(bottomRel) + } + i += 1 + } + val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava) --- End diff -- we only need to create the newTopRel if `needTransform == true` > Add RetractionRule at the stage of decoration > --------------------------------------------- > > Key: FLINK-6090 > URL: https://issues.apache.org/jira/browse/FLINK-6090 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Hequn Cheng > > Implement optimizer for retraction: > 1.Add RetractionRule at the stage of decoration,which can derive the > replace table/append table, NeedRetraction property. > 2.Match the NeedRetraction and replace table, mark the accumulating mode > > When this task is finished, we can turn on retraction for different operators > according to accumulating mode. -- This message was sent by Atlassian JIRA (v6.3.15#6346)