snehashisp commented on code in PR #16984: URL: https://github.com/apache/kafka/pull/16984#discussion_r1870825855
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { + return getConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * Used to get a versioned converter. It will always try and get the converter from the set of plugin classloaders. + * + * @param config the configuration containing the {@link Converter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null + * @param versionPropertyName the name of the property that contains the version of the {@link Converter} class; may not be null + * @return the instantiated and configured {@link Converter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link Converter} implementation class could not be found, + * @throws VersionedPluginLoadingException if the version requested is not found + */ + public Converter newConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; Review Comment: Yes, its incorrect, should be only when the version is specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org