Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/6201 @fhueske @twalthr thanks for the comments. In `from-source`, the only system i know of is Kafka10 or Kafka11, which support writing record along with timestamp. To support `from-source` in table sink, I think we can do the following: 1) add a connector property, e.g. connector.support-timestamp. Only if connector.support-timestamp is true, we will allow the sink table schema to contain a field with rowtime type `from-source`. Otherwise, an exception will be thrown. 2) if the condition in 1) is satisfied, we will create corresponding rowtime field in the sink table schema with type LONG, in TableEnvironment.insertInto(), we will validate the sink schema against the insertion source. Also, in the TableSink.emitDataStream() implementation, we will need to insert an timestamp assigner operator to set StreamRecord.timestamp (should we reuse existing interface, or create a new timestampInserter interface?) and remove the extra rowtime field from StreamRecord.value before we emit the datastream to the sink. (for kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true)) Please correct me if I missed something here. What do you think?
---