[ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159598#comment-16159598 ]
ASF GitHub Bot commented on FLINK-6563: --------------------------------------- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r137913506 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -106,8 +240,191 @@ return deserializationSchema; } - @Override - public String explainSource() { - return ""; + /** + * Assigns ingestion time timestamps and watermarks. + */ + public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row> { + + private long curTime = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Row element, long previousElementTimestamp) { + long t = System.currentTimeMillis(); + if (t > curTime) { + curTime = t; + } + return curTime; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(curTime - 1); + } + } + + protected AssignerWithPeriodicWatermarks<Row> getAssigner() { + return this.timestampAssigner; + } + + /** + * Checks that the provided row time attribute is valid, determines its position in the schema, + * and adjusts the return type. + * + * @param rowtime The attribute to check. + */ + private void configureRowTimeAttribute(String rowtime) { + Preconditions.checkNotNull(rowtime, "Row time attribute must not be null."); + + if (this.ingestionTimeAttribute != null) { + throw new ValidationException( + "You can only specify a row time attribute OR an ingestion time attribute."); + } + + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "Row time attribute can only be specified once."); + } + + // get current fields + String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames(); + TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes(); + + // check if the rowtime field exists and remember position + this.rowtimeFieldPos = -1; --- End diff -- What about accelerating the efforts on FLINK-7446 and FLINK-7548? Unfortunately our use cases are a little bit more than that thus this PR will not solve the problem out of the box. In one use case we have a timestamp that is a `double` instead of a `bigint`, in another use case we have the timestamp sit in a nested structure. That's the reason why I'm more inclined for a decorator-based approach which is more easy to customize. > Expose time indicator attributes in the KafkaTableSource > -------------------------------------------------------- > > Key: FLINK-6563 > URL: https://issues.apache.org/jira/browse/FLINK-6563 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Haohui Mai > Assignee: Haohui Mai > Priority: Critical > Fix For: 1.4.0 > > > This is a follow up for FLINK-5884. > After FLINK-5884 requires the {{TableSource}} interfaces to expose the > processing time and the event time for the data stream. This jira proposes to > expose these two information in the Kafka table source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)