[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735290#comment-15735290
 ] 

ASF GitHub Bot commented on FLINK-5220:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2923#discussion_r91709480
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
 ---
    @@ -0,0 +1,154 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.batch
    +
    +import org.apache.flink.api.common.io.GenericInputFormat
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
    +import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment 
=> JavaExecEnv}
    +import org.apache.flink.api.scala.ExecutionEnvironment
    +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
    +import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
    +import org.apache.flink.api.table.typeutils.RowTypeInfo
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.{Before, Test}
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +
    +@RunWith(classOf[Parameterized])
    +class ProjectableTableSourceITCase(mode: TestExecutionMode,
    +  configMode: TableConfigMode)
    +  extends TableProgramsTestBase(mode, configMode) {
    +
    +  private val tableName = "MyTable"
    +  private var tableEnv: BatchTableEnvironment = null
    +
    +  @Before
    +  def initTableEnv(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    tableEnv = TableEnvironment.getTableEnvironment(env, config)
    +    tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
    +  }
    +
    +  @Test
    +  def testTableAPI(): Unit = {
    +    val results = tableEnv
    +      .scan(tableName)
    +      .where("amount < 4")
    +      .select("id, name")
    +      .collect()
    +
    +    val expected = Seq(
    +      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
    +      "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +
    +  @Test
    +  def testSQL(): Unit = {
    +    val results = tableEnv
    +      .sql(s"select id, name from $tableName where amount < 4 ")
    +      .collect()
    +
    +    val expected = Seq(
    +      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
    +      "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +}
    +
    +class TestProjectableTableSource(
    +  fieldTypes: Array[TypeInformation[_]],
    +  fieldNames: Array[String])
    +  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
    +
    +  def this() = this(
    +    fieldTypes = Array(
    +      BasicTypeInfo.STRING_TYPE_INFO,
    +      BasicTypeInfo.LONG_TYPE_INFO,
    +      BasicTypeInfo.INT_TYPE_INFO,
    +      BasicTypeInfo.DOUBLE_TYPE_INFO),
    +    fieldNames = Array[String]("name", "id", "amount", "price")
    +  )
    +
    +  /** Returns the data of the table as a 
[[org.apache.flink.api.java.DataSet]]. */
    +  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
    +    execEnv.createInput(new ProjectableInputFormat(33, fieldNames), 
getReturnType).setParallelism(1)
    --- End diff --
    
    I think the test can be simplified if we use `execEnv.fromCollection` and 
build the records in the collection depending on the projection. 
    We do not need a special `InputFormat` here.


> Flink SQL projection pushdown
> -----------------------------
>
>                 Key: FLINK-5220
>                 URL: https://issues.apache.org/jira/browse/FLINK-5220
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to