[ https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171351#comment-16171351 ]
ASF GitHub Bot commented on FLINK-7571: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4635#discussion_r139633811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala --- @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo class StreamTableSourceTable[T]( override val tableSource: TableSource[T], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable[T](tableSource, statistic) { - + extends TableSourceTable[T]( + tableSource, + StreamTableSourceTable.adjustFieldIndexes(tableSource), + StreamTableSourceTable.adjustFieldNames(tableSource), + statistic) { override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource) + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType( + this.fieldNames, + fieldTypes) + } - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList +} - val fields = fieldNames.zip(fieldTypes) +object StreamTableSourceTable { + + private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = { + val (rowtime, proctime) = getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldIndices(tableSource) + + // append rowtime marker + val withRowtime = if (rowtime.isDefined) { + original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER --- End diff -- That's how rowtime fields are handled at the moment. They are always appended at the end of the row. There is a JIRA to use existing fields as rowtime fields (https://issues.apache.org/jira/browse/FLINK-7446) but this has not been implemented yet. I'm currently working on a PR for that. > Execution of TableSources with Time Indicators fails > ---------------------------------------------------- > > Key: FLINK-7571 > URL: https://issues.apache.org/jira/browse/FLINK-7571 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > Priority: Critical > > The translation of queries that include a TableSource with time indicators > fails during the code generation because field names and field indicies are > not adjusted for the time indicators. -- This message was sent by Atlassian JIRA (v6.4.14#64029)