Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r137933653 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- I think you are right. We should not ask each `TableSource` to assign watermarks. A decorator approach, i.e., a watermark generation wrapper around a `TableSource` would be one option. Alternatively, we could also give the responsibility for watermark generation to the scan operator. All the `TableSource` would need to provide is the name of the field for which watermarks are generated and the watermark strategy ascending, bounded, or custom). The generation would be handled completely internally. At that point, we can also access all fields of any type (Row, Pojo, Tuple, ...), nested or flat, via code generation. So, we would need to extend or replace the `DefinedRowtimeAttribute` interface because we need information about the watermark strategy (ascending, bounded, or custom). What do you think @haohui? I already started to work on FLINK-7446 but will also not too much time in the next weeks due to conferences. I hope that we can address these issues with the next release.
---