Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202271871 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java --- @@ -124,110 +123,127 @@ params.putProperties(properties); // validate - new SchemaValidator(true).validate(params); + // allow Kafka timestamps to be used, watermarks can not be received from source + new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params); new KafkaValidator().validate(params); - formatValidator().validate(params); - // build - final KafkaTableSource.Builder builder = createBuilderWithFormat(params); + // deserialization schema using format discovery + final DeserializationSchemaFactory<?> formatFactory = TableFormatFactoryService.find( + DeserializationSchemaFactory.class, + properties, + this.getClass().getClassLoader()); + @SuppressWarnings("unchecked") + final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory + .createDeserializationSchema(properties); - // topic - final String topic = params.getString(CONNECTOR_TOPIC); - builder.forTopic(topic); + // schema + final TableSchema schema = params.getTableSchema(SCHEMA()); + + // proctime + final String proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params).orElse(null); --- End diff -- this is kind of ridiculous :/ Deep inside we work on `Optional` and then we switch to null... Please drag this `Optional` until the very end and do this `orElse(null)` conversion in `org.apache.flink.table.sources.DefinedProctimeAttribute#getProctimeAttribute` implementation.
---