[ https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867397#comment-17867397 ]
Chris Egerton commented on KAFKA-17044: --------------------------------------- The crux of the issue is that invoking {{stop}} on a connector that's blocked in {{start}} is likely to cause more issues than it fixes. The most trivial way of preventing race conditions with that kind of API would be for connector developers to make both methods {{{}synchronized{}}}, which would have the same effect as today (where blocking in {{start}} would prevent {{stop}} from being invoked). More fine-grained synchronization is obviously possible, but comes with a large amount of complexity that IMO isn't worth the implementation cost. Instead, connector developers can and should try to make {{start}} complete as quickly as possible, and if necessary, they can trigger follow-up operations using the existing API hook to trigger task reconfiguration. I'm also not certain that the Confluent JDBC connector is the best counterexample–does it actually perform synchronous database queries in its {{start}} method? It's been years since I've looked at that code base but IIRC it kicks off a separate thread to do that work, which will then invoke {{context.requestTaskReconfiguration}} if/when a change in the set of to-be-consumed tables is detected. If that's correct, it seems like the connector might be a good example of the flow I'm suggesting, instead of a flow where it's expected that {{stop}} potentially be invoked before {{start}} has completed. > Connector deletion can lead to resource leak during a long running connector > startup > ------------------------------------------------------------------------------------ > > Key: KAFKA-17044 > URL: https://issues.apache.org/jira/browse/KAFKA-17044 > Project: Kafka > Issue Type: Bug > Components: connect > Reporter: Bhagyashree > Priority: Major > > We have identified a gap in the shutdown flow for the connector worker. If > the connector is in > [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404] > state and still executing the > [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218] > method, a DELETE API call would invoke the > [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298] > and [notify() > |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but > the connector worker would not shutdown immediately. This happens because > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > is a blocking call and the control reaches > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > in > [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151] > after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call has completed. This results in a gap in the delete flow where the > connector is not immediately shutdown leaving the resources running. > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > keeps running and only when the execution of > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes, we reach at the point of > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > and then > [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183] > of the connector worker is invoked. > This seems similar to what has been identified for connector tasks as part of > https://issues.apache.org/jira/browse/KAFKA-14725. > *Steps to repro* > 1. Start a connector with time taking operation in > [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call > 2. Call DELETE API to delete this connector > 3. The connector would be deleted only after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes. > The issue was observed when a connector was configured to retry a db > connection for sometime. > {*}Current Behaviour{*}: The connector did not shutdown until the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > method completed. > {*}Expected Behaviou{*}r: The connector should abort what it is doing and > shutdown as requested by the Delete call. -- This message was sent by Atlassian Jira (v8.20.10#820010)