Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r137671151 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- I'm wondering why we need to remove the field here and add it back later on. Changing the orders of the fields seems problematic and can potentially break serialization (in very hacky cases). Another question is that to which extent a customized timestamp assigner can reuse the code here? Is it possible to implement it as a decorator of the table source? That way it opens up the possibilities to reuse the code for other table sources.
---