Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r137737055
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -106,8 +240,191 @@
                return deserializationSchema;
        }
     
    -   @Override
    -   public String explainSource() {
    -           return "";
    +   /**
    +    * Assigns ingestion time timestamps and watermarks.
    +    */
    +   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<Row> {
    +
    +           private long curTime = Long.MIN_VALUE;
    +
    +           @Override
    +           public long extractTimestamp(Row element, long 
previousElementTimestamp) {
    +                   long t = System.currentTimeMillis();
    +                   if (t > curTime) {
    +                           curTime = t;
    +                   }
    +                   return curTime;
    +           }
    +
    +           @Nullable
    +           @Override
    +           public Watermark getCurrentWatermark() {
    +                   return new Watermark(curTime - 1);
    +           }
    +   }
    +
    +   protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
    +           return this.timestampAssigner;
    +   }
    +
    +   /**
    +    * Checks that the provided row time attribute is valid, determines its 
position in the schema,
    +    * and adjusts the return type.
    +    *
    +    * @param rowtime The attribute to check.
    +    */
    +   private void configureRowTimeAttribute(String rowtime) {
    +           Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
    +
    +           if (this.ingestionTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "You can only specify a row time attribute OR 
an ingestion time attribute.");
    +           }
    +
    +           if (this.rowTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "Row time attribute can only be specified 
once.");
    +           }
    +
    +           // get current fields
    +           String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
    +           TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
    +
    +           // check if the rowtime field exists and remember position
    +           this.rowtimeFieldPos = -1;
    --- End diff --
    
    Yes, it's not very nice to move the time attribute into the `StreamRecord`, 
remove the `Row` field, and copy the `StreamRecord` timestamp back into the 
`Row`. That's just how the current interface is designed. However, this will be 
changed with [FLINK-7446](https://issues.apache.org/jira/browse/FLINK-7446). 
    
    The issue of watermark generation (timestamps don't need to be generated as 
they are already expected to be in the `Row`) will be addressed by 
[FLINK-7548](https://issues.apache.org/jira/browse/FLINK-7548). 
    
    We have to see how much of FLINK-7446 and FLINK-7548 we can solve before 
Flink 1.4.0. I did this PR to have at least some time attribute support for 
KafkaTableSources in 1.4.0. Do you need more specialized watermark generators 
for your use case?


---

Reply via email to