[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929280#comment-15929280 ]
ASF GitHub Bot commented on FLINK-3849: --------------------------------------- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566787 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala --- @@ -16,106 +16,96 @@ * limitations under the License. */ -package org.apache.flink.table.plan.rules.util +package org.apache.flink.table.plan.util import java.math.BigDecimal +import java.util import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} -import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ -import org.junit.Assert.{assertArrayEquals, assertTrue} -import org.junit.{Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable -/** - * This class is responsible for testing RexProgramProjectExtractor. - */ -class RexProgramProjectExtractorTest { - private var typeFactory: JavaTypeFactory = _ - private var rexBuilder: RexBuilder = _ - private var allFieldTypes: Seq[RelDataType] = _ - private val allFieldNames = List("name", "id", "amount", "price") - - @Before - def setUp(): Unit = { - typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) - rexBuilder = new RexBuilder(typeFactory) - allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) - } +abstract class RexProgramTestBase { - @Test - def testExtractRefInputFields(): Unit = { - val usedFields = extractRefInputFields(buildRexProgram()) - assertArrayEquals(usedFields, Array(2, 3, 1)) - } + val typeFactory: JavaTypeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) + + val allFieldNames: util.List[String] = List("name", "id", "amount", "price", "flag").asJava + + val allFieldTypes: util.List[RelDataType] = + List(VARCHAR, BIGINT, INTEGER, DOUBLE, BOOLEAN).map(typeFactory.createSqlType).asJava + + var rexBuilder: RexBuilder = new RexBuilder(typeFactory) - @Test - def testRewriteRexProgram(): Unit = { - val originRexProgram = buildRexProgram() - assertTrue(extractExprStrList(originRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "$3", - "*($t2, $t3)", - "100", - "<($t4, $t5)", - "6", - ">($t1, $t7)", - "AND($t6, $t8)"))) - // use amount, id, price fields to create a new RexProgram - val usedFields = Array(2, 3, 1) - val types = usedFields.map(allFieldTypes(_)).toList.asJava - val names = usedFields.map(allFieldNames(_)).toList.asJava - val inputRowType = typeFactory.createStructType(types, names) - val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder) - assertTrue(extractExprStrList(newRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "*($t0, $t1)", - "100", - "<($t3, $t4)", - "6", - ">($t2, $t6)", - "AND($t5, $t7)"))) + /** + * extract all expression string list from input RexProgram expression lists + * + * @param rexProgram input RexProgram instance to analyze + * @return all expression string list of input RexProgram expression lists + */ + protected def extractExprStrList(rexProgram: RexProgram): mutable.Buffer[String] = { + rexProgram.getExprList.asScala.map(_.toString) } - private def buildRexProgram(): RexProgram = { - val types = allFieldTypes.asJava - val names = allFieldNames.asJava - val inputRowType = typeFactory.createStructType(types, names) + // select amount, amount * price as total where amount * price < 100 and id > 6 + protected def buildSimpleRexProgram1(): RexProgram = { + val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) val builder = new RexProgramBuilder(inputRowType, rexBuilder) - val t0 = rexBuilder.makeInputRef(types.get(2), 2) - val t1 = rexBuilder.makeInputRef(types.get(1), 1) - val t2 = rexBuilder.makeInputRef(types.get(3), 3) + + val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) + val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) val t3 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2)) val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L)) - // project: amount, amount * price + + // project: amount, amount * price as total builder.addProject(t0, "amount") builder.addProject(t3, "total") + // condition: amount * price < 100 and id > 6 val t6 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t3, t4)) val t7 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t5)) val t8 = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(t6, t7).asJava)) builder.addCondition(t8) + builder.getProgram } - /** - * extract all expression string list from input RexProgram expression lists - * - * @param rexProgram input RexProgram instance to analyze - * @return all expression string list of input RexProgram expression lists - */ - private def extractExprStrList(rexProgram: RexProgram) = { - rexProgram.getExprList.asScala.map(_.toString) + // select amount, amount * price as total + // where (amount * price < 100) AND (id > 6) + protected def buildSimpleRexProgram2(): RexProgram = { --- End diff -- Oh, my bad, they are the same, will remove one of them > Add FilterableTableSource interface and translation rule > -------------------------------------------------------- > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)