[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925441#comment-15925441 ]
ASF GitHub Bot commented on FLINK-3849: --------------------------------------- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106073030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** + * Extracts the indices of input fields which accessed by the RexProgram. + * + * @param rexProgram The RexProgram to analyze + * @return The indices of accessed input fields + */ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { + val visitor = new InputRefVisitor + + // extract referenced input fields from projections + rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + + // extract referenced input fields from condition + val condition = rexProgram.getCondition + if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) + } + + visitor.getFields + } + + /** + * Extract condition from RexProgram and convert it into independent CNF expressions. + * + * @param rexProgram The RexProgram to analyze + * @return converted expressions as well as RexNodes which cannot be translated + */ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + + rexProgram.getCondition match { + case condition: RexLocalRef => + val expanded = rexProgram.expandLocalRef(condition) + // converts the expanded expression to conjunctive normal form, + // like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" + val cnf = RexUtil.toCnf(rexBuilder, expanded) + // converts the cnf condition to a list of AND conditions + val conjunctions = RelOptUtil.conjunctions(cnf) + + val convertedExpressions = new mutable.ArrayBuffer[Expression] + val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] + val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray + val converter = new ConvertToExpression(inputNames, catalog) + + conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { + case Some(expression) => convertedExpressions += expression + case None => unconvertedRexNodes += rex + } + }) + (convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) + } + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = + fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = + call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to Expression. + * + * @param inputNames The input names of the relation node + * @param functionCatalog The function catalog + */ +class ConvertToExpression( + inputNames: Array[String], + functionCatalog: FunctionCatalog) + extends RexVisitor[Option[Expression]] { + + override def visitInputRef(inputRef: RexInputRef): Option[Expression] = { + Preconditions.checkArgument(inputRef.getIndex < inputNames.length) + Option(ResolvedFieldReference( + inputNames(inputRef.getIndex), + FlinkTypeFactory.toTypeInfo(inputRef.getType) + )) + } + + override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = { + throw new TableException("Bug: RefLocalRef should have been expanded") + } + + override def visitLiteral(literal: RexLiteral): Option[Expression] = { + Option(Literal(literal.getValue, FlinkTypeFactory.toTypeInfo(literal.getType))) + } + + override def visitCall(call: RexCall): Option[Expression] = { + val operands = call.getOperands.map( + operand => operand.accept(this).orNull + ) + + // return null if we cannot translate all the operands of the call + if (operands.contains(null)) { + Option.empty --- End diff -- I think, `None` is more brief than `Option.empty` > Add FilterableTableSource interface and translation rule > -------------------------------------------------------- > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)