Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/6201
  
    @fhueske @twalthr thanks for the comments. In `from-source`, the only 
system i know of is Kafka10 or Kafka11, which support writing record along with 
timestamp. To support `from-source` in table sink, I think we can do the 
following:
    1) add a connector property, e.g. connector.support-timestamp. Only if 
connector.support-timestamp is true, we will allow the sink table schema to 
contain a field with rowtime type `from-source`. Otherwise, an exception will 
be thrown.
    2) if the condition in 1) is satisfied, we will create corresponding 
rowtime field in the sink table schema with type LONG, in 
TableEnvironment.insertInto(), we will validate the sink schema against the 
insertion source. Also, in the TableSink.emitDataStream() implementation, we 
will need to insert an timestamp assigner operator to set 
StreamRecord.timestamp (should we reuse existing interface, or create a new 
timestampInserter interface?) and remove the extra rowtime field from 
StreamRecord.value before we emit the datastream to the sink. (for 
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))
    
    Please correct me if I missed something here. What do you think?


---

Reply via email to