Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139534525 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- Thanks for the response! I would not merge `DefinedProctimeAttribute` and `DefinedProctimeAttribute` into a single interface. Both are for different types of timestamps (processing time and event time): The idea is to have two interfaces: - `DefinedProctimeAttribute` adds a timestamp attribute for processing time operations. The interface specifies the name of the new attribute. The source will simply add a virtual attribute, that fetches the current time when it is accessed. - `DefinedRowtimeAttribute` (or its successor) specifies an existing attribute (via its name) to be an event time attribute and provides a watermark strategy (ascending, bounded-ooo, custom). If a `TableSource` specifies the interface, the scan will generate watermarks based on the existing field and the watermark strategy. During registration, we check that the field exists and has the right type. So instead of adding a field, we simply ensure that we have watermarks for it and change its type to be a time indicator field. In this design, we `DefinedRowtimeAttribute` specifies the field and the watermark strategy as you proposed.
---