Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139553991 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- Sorry for not stating it more clear. I was proposing having a tighter connection between `TimestampAssigner` and `DefinedProctimeAttribute`/ `DefinedRowtimeAttribute`, but not merging `DefinedProctimeAttribute`/ `DefinedRowtimeAttribute`. In our use cases it is not enough to just have a trivial implementation of `TimeStampAssigner` to extract rowtime. We need to convert the corresponding field from `double` to `bigint`. It would be great if we can do it a little bit cleaner.
---