Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147580191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -56,33 +63,123 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) - : PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { + val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToInternalRow( - new RowSchema(getRowType), + val outputSchema = new RowSchema(this.getRowType) + + // check that declared and actual type of table source DataStream are identical + if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + + s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + + s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + + s"Please validate the implementation of the TableSource.") + } + + // get expression to extract rowtime attribute + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + ) + + // ingest table and convert and extract time attributes if necessary + val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + + // generate watermarks for rowtime indicator + val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + + val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { + case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } + } else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable + } + + withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( + timeFieldIdx: Int, + assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { + val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] + assigner.nextTimestamp(timestamp) + 0L --- End diff -- I know the timestamp in the `StreamRecord` is useless for the Table/SQL API now, but the returned value will still be set and `StreamRecord.hasTimestamp()` will be `true`. How about return a negative value (e.g., -1) here, so that maybe it's possible for us to erase the time from `StreamRecord` when the extracted rowtime is negative.
---