[ https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15739102#comment-15739102 ]
ASF GitHub Bot commented on FLINK-5220: --------------------------------------- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2923#discussion_r91847990 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch + +import org.apache.flink.api.common.io.GenericInputFormat +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv} +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.{Before, Test} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class ProjectableTableSourceITCase(mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + private val tableName = "MyTable" + private var tableEnv: BatchTableEnvironment = null + + @Before + def initTableEnv(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + tableEnv = TableEnvironment.getTableEnvironment(env, config) + tableEnv.registerTableSource(tableName, new TestProjectableTableSource) + } + + @Test + def testTableAPI(): Unit = { + val results = tableEnv + .scan(tableName) + .where("amount < 4") + .select("id, name") + .collect() + + val expected = Seq( + "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16", + "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + + @Test + def testSQL(): Unit = { + val results = tableEnv + .sql(s"select id, name from $tableName where amount < 4 ") + .collect() + + val expected = Seq( + "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16", + "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} + +class TestProjectableTableSource( + fieldTypes: Array[TypeInformation[_]], + fieldNames: Array[String]) + extends BatchTableSource[Row] with ProjectableTableSource[Row] { + + def this() = this( + fieldTypes = Array( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO), + fieldNames = Array[String]("name", "id", "amount", "price") + ) + + /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */ + override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = { + execEnv.createInput(new ProjectableInputFormat(33, fieldNames), getReturnType).setParallelism(1) --- End diff -- yes, thanks > Flink SQL projection pushdown > ----------------------------- > > Key: FLINK-5220 > URL: https://issues.apache.org/jira/browse/FLINK-5220 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: zhangjing > Assignee: zhangjing > > The jira is to do projection pushdown optimization. Please go forward to the > the design document for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)