[ 
https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221805#comment-16221805
 ] 

ASF GitHub Bot commented on FLINK-7548:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4894#discussion_r146587166
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 ---
    @@ -22,24 +22,30 @@ import org.apache.calcite.plan.{RelOptCluster, 
RelOptTable, RelTraitSet}
     import org.apache.calcite.rel.RelWriter
     import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.rel.core.TableScan
    -import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.TableException
     import org.apache.flink.table.calcite.FlinkTypeFactory
    -import org.apache.flink.table.sources.TableSource
    +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
    +import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
     
     import scala.collection.JavaConverters._
     
     abstract class PhysicalTableSourceScan(
         cluster: RelOptCluster,
         traitSet: RelTraitSet,
         table: RelOptTable,
    -    val tableSource: TableSource[_])
    +    val tableSource: TableSource[_],
    +    val selectedFields: Option[Array[Int]])
       extends TableScan(cluster, traitSet, table) {
     
       override def deriveRowType(): RelDataType = {
         val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
    -    flinkTypeFactory.buildLogicalRowType(
    -      TableEnvironment.getFieldNames(tableSource),
    -      TableEnvironment.getFieldTypes(tableSource.getReturnType))
    +    val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
    +      case _: StreamTableSourceTable[_] => true
    +      case _: BatchTableSourceTable[_] => false
    +      case t => throw TableException(s"Unknown Table type ${t.getClass}.")
    +    }
    +
    +    TableSourceUtil.getTableSchema(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
    --- End diff --
    
    I would rename this method to `getRelDataType`. Because it does not return 
our `TableSchema`.


> Support watermark generation for TableSource
> --------------------------------------------
>
>                 Key: FLINK-7548
>                 URL: https://issues.apache.org/jira/browse/FLINK-7548
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Fabian Hueske
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> As discussed in FLINK-7446, currently the TableSource only support to define 
> rowtime field, but not support to extract watermarks from the rowtime field. 
> We can provide a new interface called {{DefinedWatermark}}, which has two 
> methods {{getRowtimeAttribute}} (can only be an existing field) and 
> {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
> deprecated.
> How to support periodic and punctuated watermarks and support some built-in 
> strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to