[ https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933110#comment-14933110 ]
ASF GitHub Bot commented on FLINK-2167: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1127#discussion_r40536239 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker +.resolveFieldNameAndTableSource +import org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr +import org.apache.flink.api.table.input.{AdaptiveTableSource, TableSource} +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.table.trees.Rule + +import scala.collection.mutable.ArrayBuffer + +/** + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each corresponding + * AdaptiveTableSource that support predicates. + */ +class PredicatePushdown(val inputOperation: PlanNode) extends Rule[Expression] { + + def apply(expr: Expression) = { + // get all table sources where predicates can be push into + val tableSources = getPushableTableSources(inputOperation) + + // prune expression tree such that it only contains constant predicates + // such as a=1,a="Hello World", isNull(a) but not a=b + val constantExpr = pruneExpr(isResolvedAndConstant, expr) + + // push predicates to each table source respectively + for (ts <- tableSources) { + // prune expression tree such that it only contains field references of ts + val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr) + + // resolve field names to field names of the table source + val result = tsExpr.transformPost { + case rfr@ResolvedFieldReference(fieldName, typeInfo) => + ResolvedFieldReference( + resolveFieldNameAndTableSource(inputOperation, fieldName)._2, + typeInfo + ) + } + // push down predicates + if (result != NopExpression()) { + ts.notifyPredicates(result) + } + } + expr + } + + // ---------------------------------------------------------------------------------------------- + + /** + * @return all AdaptiveTableSources the given PlanNode contains + */ + def getPushableTableSources(tree: PlanNode): Seq[AdaptiveTableSource] = tree match { + case Root(ts: AdaptiveTableSource, _) if ts.supportsPredicatePushdown() => Seq(ts) + case pn:PlanNode => + val res = new ArrayBuffer[AdaptiveTableSource]() + for (child <- pn.children) res ++= getPushableTableSources(child) + res --- End diff -- This can be replaced by: ```scala pn.children flatMap { child => getPushableTableSources(child ) } ``` if I'm not mistaken, seems more Scala-y :smile: > Add fromHCat() to TableEnvironment > ---------------------------------- > > Key: FLINK-2167 > URL: https://issues.apache.org/jira/browse/FLINK-2167 > Project: Flink > Issue Type: New Feature > Components: Table API > Affects Versions: 0.9 > Reporter: Fabian Hueske > Assignee: Timo Walther > Priority: Minor > Labels: starter > > Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} > from an HCatalog table. > The implementation could reuse Flink's HCatInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)