C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092133488


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> 
classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                Utils.closeQuietly((AutoCloseable) object, "AutoCloseable 
object constructed and configured during failed call to 
getConfiguredInstances");

Review Comment:
   We still need an `instanceof` check here:
   ```suggestion
                   if (object instanceof AutoCloseable) {
                       Utils.closeQuietly((AutoCloseable) object, 
"AutoCloseable object constructed and configured during failed call to 
getConfiguredInstances");
                   }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -137,6 +137,7 @@
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+    private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more 
sense to inline directly into the test case it's used in.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
     private final int defaultApiTimeoutMs = 60000;
     private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
+    private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more 
sense to inline directly into the test case it's used in.



##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> 
classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                Utils.closeQuietly((AutoCloseable) object, "AutoCloseable 
object constructed and configured during failed call to 
getConfiguredInstances");

Review Comment:
   Actually, come to think of it, we might also want to invoke `close` on 
objects created in `getConfiguredInstance` if they throw an exception from 
`configure` 
[here](https://github.com/apache/kafka/blob/6c98544a964b40ede6bbe1b3440f8e5db96a4ad6/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L407).
   
   Perhaps we could pull this out into a reusable method and use it both here 
and there? Thinking something like:
   
   ```java
   private static void maybeClose(Object object, String name) {
       if (object instanceof AutoCloseable)
           Utils.closeQuietly(object, name);
   }
   ```



##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -45,6 +47,8 @@
 
 public class AbstractConfigTest {
 
+
+

Review Comment:
   We don't need this change; it's fine as-is.



##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   We don't have to test both the producer and consumer interceptor classes 
here; we just have to make sure that after an invocation of 
`AbstractConfig::getConfiguredInstances`, the right objects are cleaned up. 
(The value in this test is that we ensure that the cleanup logic is handled 
directly by the `AbstractConfig` class, instead of being performed out-of-band 
by, e.g., the `KafkaConsumer` or `KafkaProducer` classes).
   
   In addition, we don't have to use this test case or the pattern of 
`testValidInputs`/`testInvalidInputs` helper methods.
   
   We can save a lot of complexity by pulling this out into its own test case, 
and using the existing `TestConfig` class instead of creating a new one:
   
   ```java
       @Test
       public void testConfiguredInstancesClosedOnFailure() {
           try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = 
MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, 
threeConsumerInterceptors);
               props.put("client.id", "test");
               TestConfig testConfig = new TestConfig(props);
   
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
               assertThrows(
                       Exception.class,
                       () -> 
testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, 
Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to