Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4638#discussion_r139714565 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -62,10 +88,107 @@ DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo) { - this.topic = Preconditions.checkNotNull(topic, "Topic"); - this.properties = Preconditions.checkNotNull(properties, "Properties"); - this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must not be null."); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + } + + /** + * Adds processing time attribute to the table. The attribute is appended to each row. + * + * @param proctime The name of the added processing time attribute. + */ + public void addProcTimeAttribute(String proctime) { + Preconditions.checkNotNull(proctime, "Processing time attribute must not be null."); + this.procTimeAttribute = proctime; + } + + /** + * Adds an ingestion time attribute to the table. The attribute is append at the end of each row. + * + * <p>For each row, the ingestion time attribute is initialized with the current time when the row + * is read from Kafka. From there on, it behaves as an event time attribute. + * + * @param ingestionTime The name of the added ingestion time attribute. + */ + public void addIngestionTimeAttribute(String ingestionTime) { + Preconditions.checkNotNull(ingestionTime, "Ingestion time attribute must not be null."); + if (this.rowTimeAttribute != null) { + throw new ValidationException( + "You can only specify an ingestion time attribute OR a row time attribute."); + } + this.rowTimeAttribute = ingestionTime; --- End diff -- Yes, that's inconsistently handled. Thanks for pointing this out. I removed `this.ingestionTimeAttribute`
---