C0urante commented on code in PR #13168: URL: https://github.com/apache/kafka/pull/13168#discussion_r1092133488
########## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ########## @@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M return objects; Map<String, Object> configPairs = originals(); configPairs.putAll(configOverrides); - for (Object klass : classNames) { - Object o = getConfiguredInstance(klass, t, configPairs); - objects.add(t.cast(o)); + + try { + for (Object klass : classNames) { + Object o = getConfiguredInstance(klass, t, configPairs); + objects.add(t.cast(o)); + } + } catch (Exception e) { + for (Object object : objects) { + Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); Review Comment: We still need an `instanceof` check here: ```suggestion if (object instanceof AutoCloseable) { Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); } ``` ########## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ########## @@ -137,6 +137,7 @@ import static org.mockito.Mockito.when; public class KafkaProducerTest { + private final int targetInterceptor = 3; Review Comment: Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in. ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -168,6 +169,7 @@ public class KafkaConsumerTest { private final int defaultApiTimeoutMs = 60000; private final int requestTimeoutMs = defaultApiTimeoutMs / 2; private final int heartbeatIntervalMs = 1000; + private final int targetInterceptor = 3; Review Comment: Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in. ########## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ########## @@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M return objects; Map<String, Object> configPairs = originals(); configPairs.putAll(configOverrides); - for (Object klass : classNames) { - Object o = getConfiguredInstance(klass, t, configPairs); - objects.add(t.cast(o)); + + try { + for (Object klass : classNames) { + Object o = getConfiguredInstance(klass, t, configPairs); + objects.add(t.cast(o)); + } + } catch (Exception e) { + for (Object object : objects) { + Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances"); Review Comment: Actually, come to think of it, we might also want to invoke `close` on objects created in `getConfiguredInstance` if they throw an exception from `configure` [here](https://github.com/apache/kafka/blob/6c98544a964b40ede6bbe1b3440f8e5db96a4ad6/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L407). Perhaps we could pull this out into a reusable method and use it both here and there? Thinking something like: ```java private static void maybeClose(Object object, String name) { if (object instanceof AutoCloseable) Utils.closeQuietly(object, name); } ``` ########## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ########## @@ -45,6 +47,8 @@ public class AbstractConfigTest { + + Review Comment: We don't need this change; it's fine as-is. ########## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ########## @@ -54,6 +58,12 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); testInvalidInputs("test1,test2"); testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); + testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " + + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", " + + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR, org.apache.kafka.test.MockConsumerInterceptor.class); + testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " + + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", " + + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR, org.apache.kafka.test.MockProducerInterceptor.class); Review Comment: We don't have to test both the producer and consumer interceptor classes here; we just have to make sure that after an invocation of `AbstractConfig::getConfiguredInstances`, the right objects are cleaned up. (The value in this test is that we ensure that the cleanup logic is handled directly by the `AbstractConfig` class, instead of being performed out-of-band by, e.g., the `KafkaConsumer` or `KafkaProducer` classes). In addition, we don't have to use this test case or the pattern of `testValidInputs`/`testInvalidInputs` helper methods. We can save a lot of complexity by pulling this out into its own test case, and using the existing `TestConfig` class instead of creating a new one: ```java @Test public void testConfiguredInstancesClosedOnFailure() { try { Map<String, String> props = new HashMap<>(); String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName(); props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors); props.put("client.id", "test"); TestConfig testConfig = new TestConfig(props); MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3); assertThrows( Exception.class, () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class) ); assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get()); assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get()); } finally { MockConsumerInterceptor.resetCounters(); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org