[ 
https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171530#comment-16171530
 ] 

ASF GitHub Bot commented on FLINK-7571:
---------------------------------------

Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139661930
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
 ---
    @@ -28,48 +29,113 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] 
= {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    Thanks, Actually we can define existing field as the rowtime field, when we 
convert DataStream to Table. That's why I was little bit confused, but we have 
PR for TableSource then it's good.


> Execution of TableSources with Time Indicators fails
> ----------------------------------------------------
>
>                 Key: FLINK-7571
>                 URL: https://issues.apache.org/jira/browse/FLINK-7571
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Critical
>
> The translation of queries that include a TableSource with time indicators 
> fails during the code generation because field names and field indicies are 
> not adjusted for the time indicators.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to