klam-shop commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1700749005
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java: ########## @@ -636,21 +618,25 @@ public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject( private static Map<String, String> autoCompleteSchemaRegistrySubject( Map<String, String> options) { Configuration configuration = Configuration.fromMap(options); - // the subject autoComplete should only be used in sink, check the topic first - validateSinkTopic(configuration); - final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT); - final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT); - final Optional<String> format = configuration.getOptional(FORMAT); - final String topic = configuration.get(TOPIC).get(0); - - if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) { - autoCompleteSubject(configuration, format.get(), topic + "-value"); - } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) { - autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value"); - } + // the subject autoComplete should only be used in sink with a single topic, check the topic Review Comment: I think to make the subject autoComplete work with the SCHEMA_REGISTRY_FORMATS, we need to have a follow-up issue to have the formats handle multiple subjects dynamically. Writing to a static subject for many topics would not match with the Confluent Schema registry default [TopicNameStrategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy) which derives its subject from the topic name, eg. `topic-value` and `topic-key` subjects for topic `topic`. I decided to disable the logic here for when there are multiple topics being produced to, and leave the extension for future work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org