Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40536239
  
    --- Diff: 
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala
 ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.expressions.analysis
    +
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker
    +.resolveFieldNameAndTableSource
    +import 
org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr
    +import org.apache.flink.api.table.input.{AdaptiveTableSource, TableSource}
    +import org.apache.flink.api.table.plan._
    +import org.apache.flink.api.table.trees.Rule
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +/**
    + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each 
corresponding
    + * AdaptiveTableSource that support predicates.
    + */
    +class PredicatePushdown(val inputOperation: PlanNode) extends 
Rule[Expression] {
    +
    +  def apply(expr: Expression) = {
    +    // get all table sources where predicates can be push into
    +    val tableSources = getPushableTableSources(inputOperation)
    +
    +    // prune expression tree such that it only contains constant predicates
    +    // such as a=1,a="Hello World", isNull(a) but not a=b
    +    val constantExpr = pruneExpr(isResolvedAndConstant, expr)
    +
    +    // push predicates to each table source respectively
    +    for (ts <- tableSources) {
    +      // prune expression tree such that it only contains field references 
of ts
    +      val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr)
    +
    +      // resolve field names to field names of the table source
    +      val result = tsExpr.transformPost {
    +        case rfr@ResolvedFieldReference(fieldName, typeInfo) =>
    +          ResolvedFieldReference(
    +            resolveFieldNameAndTableSource(inputOperation, fieldName)._2,
    +            typeInfo
    +          )
    +      }
    +      // push down predicates
    +      if (result != NopExpression()) {
    +        ts.notifyPredicates(result)
    +      }
    +    }
    +    expr
    +  }
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  /**
    +   * @return all AdaptiveTableSources the given PlanNode contains
    +   */
    +  def getPushableTableSources(tree: PlanNode): Seq[AdaptiveTableSource] = 
tree match {
    +    case Root(ts: AdaptiveTableSource, _) if 
ts.supportsPredicatePushdown() => Seq(ts)
    +    case pn:PlanNode =>
    +      val res = new ArrayBuffer[AdaptiveTableSource]()
    +      for (child <- pn.children) res ++= getPushableTableSources(child)
    +      res
    --- End diff --
    
    This can be replaced by:
    ```scala
    pn.children flatMap { child => getPushableTableSources(child ) }
    ```
    if I'm not mistaken, seems more Scala-y :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to