[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221805#comment-16221805 ]
ASF GitHub Bot commented on FLINK-7548: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r146587166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala --- @@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} import scala.collection.JavaConverters._ abstract class PhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: TableSource[_]) + val tableSource: TableSource[_], + val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) { override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) + val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { + case _: StreamTableSourceTable[_] => true + case _: BatchTableSourceTable[_] => false + case t => throw TableException(s"Unknown Table type ${t.getClass}.") + } + + TableSourceUtil.getTableSchema(tableSource, selectedFields, streamingTable, flinkTypeFactory) --- End diff -- I would rename this method to `getRelDataType`. Because it does not return our `TableSchema`. > Support watermark generation for TableSource > -------------------------------------------- > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Fabian Hueske > Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)