[ https://issues.apache.org/jira/browse/KAFKA-12726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337024#comment-17337024 ]
Chris Egerton commented on KAFKA-12726: --------------------------------------- Okay, that makes the problem clearer (y). Maybe we can update the ticket title/description to match that since I believe we've ruled out the original issue of hung tasks interfering with the lifecycle of their successors? As far as resource cleanup goes: * I agree that pulling the task out of the {{task-count}} metric is warranted; the current behavior sounds buggy and hopefully that change shouldn't be too controversial. And, if it is and people want more insight into zombies that may be running on their workers, [shameless plug|https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks]. * If we want to go further, there's already precedent for proactively cleaning up other resources after tasks exceed their graceful shutdown timeout; [a (partial) fix|https://github.com/apache/kafka/pull/10016] for KAFKA-10340 involved closing the producers for source tasks on a separate thread as soon as the worker chose to abandon them. Since the issue you describe involved sink tasks specifically, we could also apply similar logic for them and their consumers. * Invoking {{Task::stop}} before a returns control to the worker might also be a viable option as long as we also take care to not commit offsets after that point. If the connector uses the framework-provided offset mechanisms (i.e. internal offsets topic for source connectors and consumer offsets for sink connectors), then we can prevent data loss, and any other unexpected failures that happen as a result of the task not being capable of handling a call to {{stop}} while it's in the middle of something else like a {{put}} or a {{poll}} are unlikely to have much impact since the task will already have been abandoned by the worker and a new one may already be running in its place (in fact, it may be better to cause the hung task to fail than to allow it to try to complete gracefully and potentially interfere with a successor). * If we go this far, the trickiest part might be deciding in which order we try to perform these cleanup actions. If the task is already hung and we call {{stop}} on it from another thread, it's entirely possible that that new thread will also become blocked up indefinitely. So it may be necessary to try to clean up resources in descending order of "likelihood to misbehave"; e.g., Kafka clients first (which, unless they're configured with a troublesome interceptor of some sort, are pretty likely to return control in a timely fashion), then the {{OffsetStorageReader}} (for source tasks only), then the {{RetryWithToleranceOperator}}, then the {{transformationChain}} (contains user-written code; now we're getting into the parts that are most likely to hang), then finally the task itself (via {{Task::stop}}). We could also invoke each of these methods on a separate thread so that one doesn't block up the other, but at that point I think we might be beating a dead horse and wasting more resources than we save. Up to you how far you want to take this; just some thoughts on how resource cleanup in general might be improved for blocked tasks. > misbehaving Task.stop() can prevent other Tasks from stopping > ------------------------------------------------------------- > > Key: KAFKA-12726 > URL: https://issues.apache.org/jira/browse/KAFKA-12726 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.8.0 > Reporter: Ryanne Dolan > Assignee: Ryanne Dolan > Priority: Minor > > We've observed a misbehaving Task fail to stop in a timely manner (e.g. stuck > in a retry loop). Despite Connect supporting a property > task.shutdown.graceful.timeout.ms, this is currently not enforced -- tasks > can take as long as they want to stop, and the only consequence is an error > message. > Unfortunately, Workers stop Tasks sequentially, meaning that a stuck Task can > prevent any further Tasks from stopping. Moreover, after a rebalance, these > lingering tasks can persist along with their replacements. For example, we've > seen a Worker's "task-count" metric double following a rebalance. > While the Connector implementation is ultimately to blame here -- a Task > probably shouldn't loop forever in stop() -- we believe the Connect runtime > should handle this situation more gracefully. -- This message was sent by Atlassian Jira (v8.3.4#803005)