Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4894#discussion_r147580191
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 ---
    @@ -56,33 +63,123 @@ class StreamTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      tableSource
    +      tableSource,
    +      selectedFields
         )
       }
     
       override def copy(
           traitSet: RelTraitSet,
    -      newTableSource: TableSource[_])
    -    : PhysicalTableSourceScan = {
    +      newTableSource: TableSource[_]): PhysicalTableSourceScan = {
     
         new StreamTableSourceScan(
           cluster,
           traitSet,
           getTable,
    -      newTableSource.asInstanceOf[StreamTableSource[_]]
    +      newTableSource.asInstanceOf[StreamTableSource[_]],
    +      selectedFields
         )
       }
     
       override def translateToPlan(
           tableEnv: StreamTableEnvironment,
           queryConfig: StreamQueryConfig): DataStream[CRow] = {
     
    +    val fieldIndexes = TableSourceUtil.computeIndexMapping(
    +      tableSource,
    +      isStreamTable = true,
    +      selectedFields)
    +
         val config = tableEnv.getConfig
         val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
    -    convertToInternalRow(
    -      new RowSchema(getRowType),
    +    val outputSchema = new RowSchema(this.getRowType)
    +
    +    // check that declared and actual type of table source DataStream are 
identical
    +    if (inputDataStream.getType != tableSource.getReturnType) {
    +      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
    +        s"returned a DataStream of type ${inputDataStream.getType} that 
does not match with the " +
    +        s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
    +        s"Please validate the implementation of the TableSource.")
    +    }
    +
    +    // get expression to extract rowtime attribute
    +    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
    +      tableSource,
    +      selectedFields,
    +      cluster,
    +      tableEnv.getRelBuilder,
    +      TimeIndicatorTypeInfo.ROWTIME_INDICATOR
    +    )
    +
    +    // ingest table and convert and extract time attributes if necessary
    +    val ingestedTable = convertToInternalRow(
    +      outputSchema,
           inputDataStream,
    -      new StreamTableSourceTable(tableSource),
    -      config)
    +      fieldIndexes,
    +      config,
    +      rowtimeExpression)
    +
    +    // generate watermarks for rowtime indicator
    +    val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
    +      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
    +
    +    val withWatermarks = if (rowtimeDesc.isDefined) {
    +      val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
    +      val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
    +      watermarkStrategy match {
    +        case p: PeriodicWatermarkAssigner =>
    +          val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
    +          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
    +        case p: PunctuatedWatermarkAssigner =>
    +          val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
    +          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
    +      }
    +    } else {
    +      // No need to generate watermarks if no rowtime attribute is 
specified.
    +      ingestedTable
    +    }
    +
    +    withWatermarks
    +  }
    +}
    +
    +/**
    +  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
    +  *
    +  * @param timeFieldIdx the index of the rowtime attribute.
    +  * @param assigner the watermark assigner.
    +  */
    +private class PeriodicWatermarkAssignerWrapper(
    +    timeFieldIdx: Int,
    +    assigner: PeriodicWatermarkAssigner)
    +  extends AssignerWithPeriodicWatermarks[CRow] {
    +
    +  override def getCurrentWatermark: Watermark = assigner.getWatermark
    +
    +  override def extractTimestamp(crow: CRow, previousElementTimestamp: 
Long): Long = {
    +    val timestamp: Long = 
crow.row.getField(timeFieldIdx).asInstanceOf[Long]
    +    assigner.nextTimestamp(timestamp)
    +    0L
    --- End diff --
    
    I know the timestamp in the `StreamRecord` is useless for the Table/SQL API 
now, but the returned value will still be set and `StreamRecord.hasTimestamp()` 
will be `true`. How about return a negative value (e.g., -1) here, so that 
maybe it's possible for us to erase the time from `StreamRecord` when the 
extracted rowtime is negative.


---

Reply via email to