
ASF GitHub Bot commented on FLINK-3849:

Github user KurtYoung commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,382 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.sources.{CsvTableSource, TableSource}
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.apache.flink.table.expressions.utils._
    +import org.apache.flink.table.utils.{CommonTestData, TableTestBase, 
    +import org.junit.{Assert, Test}
    +class TableSourceTest extends TableTestBase {
    +  private val projectedFields: Array[String] = Array("last", "id", "score")
    +  private val noCalcFields: Array[String] = Array("id", "score", "first")
    +  // batch plan
    +  @Test
    +  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
    +    val (tableSource, tableName) = csvTable
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +      .scan(tableName)
    +      .select('last.upperCase(), 'id.floor(), 'score * 2)
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchSourceTableNode(tableName, projectedFields),
    +      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 
2) AS _c2")
    +    )
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchProjectableSourceScanPlanSQL(): Unit = {
    +    val (tableSource, tableName) = csvTable
    +    val util = batchTestUtil()
    +    util.tEnv.registerTableSource(tableName, tableSource)
    +    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchSourceTableNode(tableName, projectedFields),
    +      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS 
    +    )
    +    util.verifySql(sqlQuery, expected)
    +  }
    +  @Test
    +  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
    +    val (tableSource, tableName) = csvTable
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +      .scan(tableName)
    +      .select('id, 'score, 'first)
    +    val expected = batchSourceTableNode(tableName, noCalcFields)
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchFilterableWithoutPushDown(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +        .scan(tableName)
    +        .select('price, 'id, 'amount)
    +        .where("price * 2 < 32")
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchSourceTableNode(
    +        tableName,
    +        Array("name", "id", "amount", "price")),
    +      term("select", "price", "id", "amount"),
    +      term("where", "<(*(price, 2), 32)")
    +    )
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchFilterablePartialPushDown(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +      .scan(tableName)
    +      .where("amount > 2 && price * 2 < 32")
    +      .select('price, 'name.lowerCase(), 'amount)
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchFilterableSourceTableNode(
    +        tableName,
    +        Array("name", "id", "amount", "price"),
    +        "'amount > 2"),
    +      term("select", "price", "LOWER(name) AS _c1", "amount"),
    +      term("where", "<(*(price, 2), 32)")
    +    )
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchFilterableFullyPushedDown(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +        .scan(tableName)
    +        .select('price, 'id, 'amount)
    +        .where("amount > 2 && amount < 32")
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchFilterableSourceTableNode(
    +        tableName,
    +        Array("name", "id", "amount", "price"),
    +        "'amount > 2 && 'amount < 32"),
    +      term("select", "price", "id", "amount")
    +    )
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchFilterableWithUnconvertedExpression(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    val result = tEnv
    +        .scan(tableName)
    +        .select('price, 'id, 'amount)
    +        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // 
cast can not be converted
    +    val expected = unaryNode(
    +      "DataSetCalc",
    +      batchFilterableSourceTableNode(
    +        tableName,
    +        Array("name", "id", "amount", "price"),
    +        "'amount > 2"),
    +      term("select", "price", "id", "amount"),
    +      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
    +    )
    +    util.verifyTable(result, expected)
    +  }
    +  @Test
    +  def testBatchFilterableWithUDF(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = batchTestUtil()
    +    val tEnv = util.tEnv
    +    tEnv.registerTableSource(tableName, tableSource)
    +    tEnv.registerFunction("func0", Func0)
    +    val result = tEnv
    +        .scan(tableName)
    +        .select('price, 'id, 'amount)
    +        .where("amount > 2 && func0(amount) < 32")
    +    // wo don't fail during the optimization
    +    tEnv.optimize(result.getRelNode)
    --- End diff --
    Yes, i added the check with function's identifier

> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>                 Key: FLINK-3849
>                 URL: https://issues.apache.org/jira/browse/FLINK-3849
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Kurt Young
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.

This message was sent by Atlassian JIRA

Reply via email to