gharris1727 commented on code in PR #17743:
URL: https://github.com/apache/kafka/pull/17743#discussion_r1949998656


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1210,6 +1217,30 @@ public void connectorOffsets(String connName, 
Map<String, String> connectorConfi
         }
     }
 
+    private Connector instantiateConnector(Map<String, String> connProps) 
throws ConnectException {
+
+        final String klass = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String version = 
connProps.get(ConnectorConfig.CONNECTOR_VERSION);
+
+        try {
+            return plugins.newConnector(klass, 
PluginUtils.connectorVersionRequirement(version));
+        } catch (InvalidVersionSpecificationException | 
VersionedPluginLoadingException e) {
+            throw new ConnectException(
+                    String.format("Failed to instantiate class for connector 
%s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
+        }
+    }
+
+    private ClassLoader instantiateConnectorClassLoader(Map<String, String> 
connProps) throws ConnectException {

Review Comment:
   I think `instantiate` is incorrect to use in this circumstance. Here and in 
the exception message.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -737,18 +747,17 @@ public KafkaFuture<Void> fenceZombies(String connName, 
int numTasks, Map<String,
         log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
         try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
             String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);

Review Comment:
   connType is unused.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -737,18 +747,17 @@ public KafkaFuture<Void> fenceZombies(String connName, 
int numTasks, Map<String,
         log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
         try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
             String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            ClassLoader connectorLoader = plugins.connectorLoader(connType);
-            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+            Connector connector = instantiateConnector(connProps);

Review Comment:
   This method only needs the class not the object, you can call 
`connectorClass`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java:
##########
@@ -197,14 +196,7 @@ public void 
shouldInstantiateAndConfigureDefaultHeaderConverter() {
         props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
         createConfig();
 
-        // Because it's not explicitly set on the supplied configuration, the 
logic to use the current classloader for the connector
-        // will exit immediately, and so this method always returns null
         HeaderConverter headerConverter = plugins.newHeaderConverter(config,
-                                                                     
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-                                                                     
ClassLoaderUsage.CURRENT_CLASSLOADER);
-        assertNull(headerConverter);
-        // But we should always find it (or the worker's default) when using 
the plugins classloader ...
-        headerConverter = plugins.newHeaderConverter(config,

Review Comment:
   This is a change in behavior, can you explain it?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -602,7 +602,7 @@ private <U> U newVersionedPlugin(
                 // if the config specifies the class name, use it, otherwise 
use the default which we can get from config.getClass
                 String classOrAlias = 
config.originalsStrings().get(classPropertyName);
                 if (classOrAlias == null) {
-                    classOrAlias = 
config.getClass(classPropertyName).getName();
+                    classOrAlias = config.getClass(classPropertyName) == null 
? null : config.getClass(classPropertyName).getName();

Review Comment:
   Ah yeah, if there is neither an original string nor a default, getClass will 
return null. :+1:



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1788,19 +1817,19 @@ public WorkerTask<T, R> build() {
             Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
 
             ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
-            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
-                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+
+            final Connector connector = 
instantiateConnector(connectorConfig.originalsStrings());

Review Comment:
   This method also does not need the object, just the class.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -549,7 +549,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig 
config, String classPro
     }
 
     private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+        if (config.getClass(classPropertyName) == null && classLoaderUsage == 
ClassLoaderUsage.CURRENT_CLASSLOADER) {

Review Comment:
   Can you explain the significance of this change?
   
   AFAIU, since the connector config includes a default, this if condition will 
never satisfy and we'll always configure the SimpleHeaderConverter. This makes 
specifying the header converter in the worker config completely ineffective. Is 
that right?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -447,6 +448,12 @@ public static Map<String, String> 
computeAliases(PluginScanResult scanResult) {
         return aliases;
     }
 
+    public static Function<ClassLoader, LoaderSwap> noOpLoaderSwap() {

Review Comment:
   This is an "incorrect" implementation that shouldn't be in production code. 
I see it's only used for testing, could you move it to a testing-only class and 
remove the TransformationStage constructors that use it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to