Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r137737055 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- Yes, it's not very nice to move the time attribute into the `StreamRecord`, remove the `Row` field, and copy the `StreamRecord` timestamp back into the `Row`. That's just how the current interface is designed. However, this will be changed with [FLINK-7446](https://issues.apache.org/jira/browse/FLINK-7446). The issue of watermark generation (timestamps don't need to be generated as they are already expected to be in the `Row`) will be addressed by [FLINK-7548](https://issues.apache.org/jira/browse/FLINK-7548). We have to see how much of FLINK-7446 and FLINK-7548 we can solve before Flink 1.4.0. I did this PR to have at least some time attribute support for KafkaTableSources in 1.4.0. Do you need more specialized watermark generators for your use case?
---