[ https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171530#comment-16171530 ]
ASF GitHub Bot commented on FLINK-7571: --------------------------------------- Github user uybhatti commented on a diff in the pull request: https://github.com/apache/flink/pull/4635#discussion_r139661930 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala --- @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo class StreamTableSourceTable[T]( override val tableSource: TableSource[T], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable[T](tableSource, statistic) { - + extends TableSourceTable[T]( + tableSource, + StreamTableSourceTable.adjustFieldIndexes(tableSource), + StreamTableSourceTable.adjustFieldNames(tableSource), + statistic) { override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource) + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType( + this.fieldNames, + fieldTypes) + } - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList +} - val fields = fieldNames.zip(fieldTypes) +object StreamTableSourceTable { + + private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = { + val (rowtime, proctime) = getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldIndices(tableSource) + + // append rowtime marker + val withRowtime = if (rowtime.isDefined) { + original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER --- End diff -- Thanks, Actually we can define existing field as the rowtime field, when we convert DataStream to Table. That's why I was little bit confused, but we have PR for TableSource then it's good. > Execution of TableSources with Time Indicators fails > ---------------------------------------------------- > > Key: FLINK-7571 > URL: https://issues.apache.org/jira/browse/FLINK-7571 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > Priority: Critical > > The translation of queries that include a TableSource with time indicators > fails during the code generation because field names and field indicies are > not adjusted for the time indicators. -- This message was sent by Atlassian JIRA (v6.4.14#64029)