C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1378078025


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -380,6 +387,110 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue converterConfigValue,
+            Class<T> converterInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String converterName,
+            String converterProperty,
+            ConverterType converterType
+    ) {
+        String converterClass = connectorConfig.get(converterProperty);
+
+        if (converterClass == null
+                || converterConfigValue == null
+                || !converterConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T converterInstance;
+        try {
+            converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+            converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   Thanks for an exhaustive analysis, I see the error of my ways now 🙏
   
   I've pushed a new commit migrating the `Utils::closeQuietly` call to a 
`finally` block. The `MaybeCloseable` idea is fascinating but I don't think the 
additional cognitive burden is worth the small bump in ergonomics for this 
change. I do think it might be worth it to apply it across the whole code base 
(sort of like how https://github.com/apache/kafka/pull/13185 forces the whole 
code base to be aware of plugin-thrown exceptions, at least for `Connector` 
instances).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -380,6 +387,110 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue converterConfigValue,
+            Class<T> converterInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String converterName,
+            String converterProperty,
+            ConverterType converterType
+    ) {
+        String converterClass = connectorConfig.get(converterProperty);
+
+        if (converterClass == null
+                || converterConfigValue == null
+                || !converterConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T converterInstance;
+        try {
+            converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+            converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   Thanks for an exhaustive analysis, I see the error of my ways now 🙏
   
   I've pushed a new commit migrating the `Utils::closeQuietly` call to a 
`finally` block. The `MaybeCloseable` idea is fascinating but I don't think the 
additional cognitive burden is worth the small bump in ergonomics for this 
change. I do think it might be worth it to apply it across the whole code base 
(sort of like how https://github.com/apache/kafka/pull/13185 forces the whole 
code base to be aware of plugin-thrown exceptions, at least for `Connector` 
instances).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -380,6 +387,110 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue converterConfigValue,
+            Class<T> converterInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String converterName,
+            String converterProperty,
+            ConverterType converterType
+    ) {
+        String converterClass = connectorConfig.get(converterProperty);
+
+        if (converterClass == null
+                || converterConfigValue == null
+                || !converterConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T converterInstance;
+        try {
+            converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+            converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   Thanks for an exhaustive analysis, I now see the error of my ways 🙏
   
   I've pushed a new commit migrating the `Utils::closeQuietly` call to a 
`finally` block. The `MaybeCloseable` idea is fascinating but I don't think the 
additional cognitive burden is worth the small bump in ergonomics for this 
change. I do think it might be worth it to apply it across the whole code base 
(sort of like how https://github.com/apache/kafka/pull/13185 forces the whole 
code base to be aware of plugin-thrown exceptions, at least for `Connector` 
instances).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to