[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697953#comment-15697953 ]
ASF GitHub Bot commented on FLINK-3848: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala --- @@ -102,11 +107,43 @@ class CsvTableSource( * Do not use it in Table API programs. */ override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { - streamExecEnv.createInput(createCsvInput(), returnType) + streamExecEnv.createInput(createCsvInput(), projectionReturnTypeWithOrder.getOrElse(returnType)) + } + + /** Returns a [[TableSource]] with ability to project fields */ + override def setProjection(fields: Array[Int]): CsvTableSource = { + val mask = new Array[Boolean](fieldNames.length) + fields.foreach(mask(_) = true) + val orderedReturnType = fields.map(fieldTypes(_)) + val indexDiff = fieldNames.length - fields.length + val order = fields.map(f => f - indexDiff).zipWithIndex.sortBy(_._1).map(_._2) + val source = copy --- End diff -- I would like use the `newFieldNames` and `newFieldTypes` to create a new `CsvTableSource` instead of a same copy, so that we do not need `orderedReturnType`. What do you think ? > Add ProjectableTableSource interface and translation rule > --------------------------------------------------------- > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)