Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r137933653
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -106,8 +240,191 @@
                return deserializationSchema;
        }
     
    -   @Override
    -   public String explainSource() {
    -           return "";
    +   /**
    +    * Assigns ingestion time timestamps and watermarks.
    +    */
    +   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<Row> {
    +
    +           private long curTime = Long.MIN_VALUE;
    +
    +           @Override
    +           public long extractTimestamp(Row element, long 
previousElementTimestamp) {
    +                   long t = System.currentTimeMillis();
    +                   if (t > curTime) {
    +                           curTime = t;
    +                   }
    +                   return curTime;
    +           }
    +
    +           @Nullable
    +           @Override
    +           public Watermark getCurrentWatermark() {
    +                   return new Watermark(curTime - 1);
    +           }
    +   }
    +
    +   protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
    +           return this.timestampAssigner;
    +   }
    +
    +   /**
    +    * Checks that the provided row time attribute is valid, determines its 
position in the schema,
    +    * and adjusts the return type.
    +    *
    +    * @param rowtime The attribute to check.
    +    */
    +   private void configureRowTimeAttribute(String rowtime) {
    +           Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
    +
    +           if (this.ingestionTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "You can only specify a row time attribute OR 
an ingestion time attribute.");
    +           }
    +
    +           if (this.rowTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "Row time attribute can only be specified 
once.");
    +           }
    +
    +           // get current fields
    +           String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
    +           TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
    +
    +           // check if the rowtime field exists and remember position
    +           this.rowtimeFieldPos = -1;
    --- End diff --
    
    I think you are right. We should not ask each `TableSource` to assign 
watermarks. 
    
    A decorator approach, i.e., a watermark generation wrapper around a 
`TableSource` would be one option. Alternatively, we could also give the 
responsibility for watermark generation to the scan operator. All the 
`TableSource` would need to provide is the name of the field for which 
watermarks are generated and the watermark strategy ascending, bounded, or 
custom). The generation would be handled completely internally. At that point, 
we can also access all fields of any type (Row, Pojo, Tuple, ...), nested or 
flat, via code generation. So, we would need to extend or replace the 
`DefinedRowtimeAttribute` interface because we need information about the 
watermark strategy (ascending, bounded, or custom). What do you think @haohui?
    
    I already started to work on FLINK-7446 but will also not too much time in 
the next weeks due to conferences. I hope that we can address these issues with 
the next release.


---

Reply via email to