[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton resolved KAFKA-14565. ----------------------------------- Resolution: Fixed > Interceptor Resource Leak > ------------------------- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Terry Beard > Assignee: Terry Beard > Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List<ConsumerInterceptor<K,V>>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List<ConsumerInterceptor<K, V>> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing any public interfaces and the subsequent KIP process, we can > * Create a class which inherits or wraps AbstractConfig that contains a new > method which will return a ConfiguredInstanceResult class. This > ConfiguredInstanceResult class will contain an optional list of successfully > created interceptors and/or exception which occurred while calling each > Interceptor::configure. Additionally, it will contain a helper method to > rethrow an exception as well as a method which returns the underlying > exception. The caller is expected to handle the exception and perform clean > up e.g. call Interceptor::close on each interceptor in the list provided by > the ConfiguredInstanceResult class. > * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} > instances if/when a failure occurs > * Add a new overloaded {{getConfiguredInstance}} / > {{getConfiguredInstances}} variant that allows users to specify whether > already-instantiated classes should be closed or not when a failure occurs > * Add a new exception type to the public API that includes a list of all of > the successfully-instantiated (and/or successfully-configured) instances > before the error was encountered so that callers can choose how to handle the > failure however they want (and possibly so that instantiation/configuration > can be attempted on every class before throwing the exception) > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)