gharris1727 commented on code in PR #17743: URL: https://github.com/apache/kafka/pull/17743#discussion_r1949998656
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1210,6 +1217,30 @@ public void connectorOffsets(String connName, Map<String, String> connectorConfi } } + private Connector instantiateConnector(Map<String, String> connProps) throws ConnectException { + + final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION); + + try { + return plugins.newConnector(klass, PluginUtils.connectorVersionRequirement(version)); + } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) { + throw new ConnectException( + String.format("Failed to instantiate class for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e); + } + } + + private ClassLoader instantiateConnectorClassLoader(Map<String, String> connProps) throws ConnectException { Review Comment: I think `instantiate` is incorrect to use in this circumstance. Here and in the exception message. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -737,18 +747,17 @@ public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); Review Comment: connType is unused. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -737,18 +747,17 @@ public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connType); - try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + Connector connector = instantiateConnector(connProps); Review Comment: This method only needs the class not the object, you can call `connectorClass`. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java: ########## @@ -197,14 +196,7 @@ public void shouldInstantiateAndConfigureDefaultHeaderConverter() { props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); createConfig(); - // Because it's not explicitly set on the supplied configuration, the logic to use the current classloader for the connector - // will exit immediately, and so this method always returns null HeaderConverter headerConverter = plugins.newHeaderConverter(config, - WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.CURRENT_CLASSLOADER); - assertNull(headerConverter); - // But we should always find it (or the worker's default) when using the plugins classloader ... - headerConverter = plugins.newHeaderConverter(config, Review Comment: This is a change in behavior, can you explain it? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -602,7 +602,7 @@ private <U> U newVersionedPlugin( // if the config specifies the class name, use it, otherwise use the default which we can get from config.getClass String classOrAlias = config.originalsStrings().get(classPropertyName); if (classOrAlias == null) { - classOrAlias = config.getClass(classPropertyName).getName(); + classOrAlias = config.getClass(classPropertyName) == null ? null : config.getClass(classPropertyName).getName(); Review Comment: Ah yeah, if there is neither an original string nor a default, getClass will return null. :+1: ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1788,19 +1817,19 @@ public WorkerTask<T, R> build() { Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); - final Class<? extends Connector> connectorClass = plugins.connectorClass( - connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + + final Connector connector = instantiateConnector(connectorConfig.originalsStrings()); Review Comment: This method also does not need the object, just the class. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -549,7 +549,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPro } private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + if (config.getClass(classPropertyName) == null && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { Review Comment: Can you explain the significance of this change? AFAIU, since the connector config includes a default, this if condition will never satisfy and we'll always configure the SimpleHeaderConverter. This makes specifying the header converter in the worker config completely ineffective. Is that right? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ########## @@ -447,6 +448,12 @@ public static Map<String, String> computeAliases(PluginScanResult scanResult) { return aliases; } + public static Function<ClassLoader, LoaderSwap> noOpLoaderSwap() { Review Comment: This is an "incorrect" implementation that shouldn't be in production code. I see it's only used for testing, could you move it to a testing-only class and remove the TransformationStage constructors that use it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org