Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147661037 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) - : PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { + val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToInternalRow( - new RowSchema(getRowType), + val outputSchema = new RowSchema(this.getRowType) + + // check that declared and actual type of table source DataStream are identical + if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + + s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + + s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + + s"Please validate the implementation of the TableSource.") + } + + // get expression to extract rowtime attribute + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + ) + + // ingest table and convert and extract time attributes if necessary + val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + + // generate watermarks for rowtime indicator + val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + + val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { + case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } + } else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable + } + + withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( + timeFieldIdx: Int, + assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { + val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] + assigner.nextTimestamp(timestamp) + 0L --- End diff -- I think this does not really matter. We can always erase the timestamp regardless of the value. For some operations that are build on DataStream API primitives (such as the group windows) we have to set it to the correct value and can discard it later. When a Table is converted into a DataStream, the StreamRecord timestamp is set to the rowtime attribute.
---