Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146587166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala --- @@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} import scala.collection.JavaConverters._ abstract class PhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: TableSource[_]) + val tableSource: TableSource[_], + val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) { override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) + val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { + case _: StreamTableSourceTable[_] => true + case _: BatchTableSourceTable[_] => false + case t => throw TableException(s"Unknown Table type ${t.getClass}.") + } + + TableSourceUtil.getTableSchema(tableSource, selectedFields, streamingTable, flinkTypeFactory) --- End diff -- I would rename this method to `getRelDataType`. Because it does not return our `TableSchema`.
---