Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40536239 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker +.resolveFieldNameAndTableSource +import org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr +import org.apache.flink.api.table.input.{AdaptiveTableSource, TableSource} +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.trees.Rule + +import scala.collection.mutable.ArrayBuffer + +/** + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each corresponding + * AdaptiveTableSource that support predicates. + */ +class PredicatePushdown(val inputOperation: PlanNode) extends Rule[Expression] { + + def apply(expr: Expression) = { + // get all table sources where predicates can be push into + val tableSources = getPushableTableSources(inputOperation) + + // prune expression tree such that it only contains constant predicates + // such as a=1,a="Hello World", isNull(a) but not a=b + val constantExpr = pruneExpr(isResolvedAndConstant, expr) + + // push predicates to each table source respectively + for (ts <- tableSources) { + // prune expression tree such that it only contains field references of ts + val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr) + + // resolve field names to field names of the table source + val result = tsExpr.transformPost { + case rfr@ResolvedFieldReference(fieldName, typeInfo) => + ResolvedFieldReference( + resolveFieldNameAndTableSource(inputOperation, fieldName)._2, + typeInfo + ) + } + // push down predicates + if (result != NopExpression()) { + ts.notifyPredicates(result) + } + } + expr + } + + // ---------------------------------------------------------------------------------------------- + + /** + * @return all AdaptiveTableSources the given PlanNode contains + */ + def getPushableTableSources(tree: PlanNode): Seq[AdaptiveTableSource] = tree match { + case Root(ts: AdaptiveTableSource, _) if ts.supportsPredicatePushdown() => Seq(ts) + case pn:PlanNode => + val res = new ArrayBuffer[AdaptiveTableSource]() + for (child <- pn.children) res ++= getPushableTableSources(child) + res --- End diff -- This can be replaced by: ```scala pn.children flatMap { child => getPushableTableSources(child ) } ``` if I'm not mistaken, seems more Scala-y :smile:
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---