Hi all, I'd like to solicit input from users and maintainers on a problem we've been dealing with for source task cleanup logic.
If you'd like to pore over some Jira history, here's the primary link: https://issues.apache.org/jira/browse/KAFKA-15090 To summarize, we accidentally introduced a breaking change for Kafka Connect in https://github.com/apache/kafka/pull/9669. Before that change, the SourceTask::stop method [1] would be invoked on a separate thread from the one that did the actual data processing for the task (polling the task for records, transforming and converting those records, then sending them to Kafka). After that change, we began invoking SourceTask::stop on the same thread that handled data processing for the task. This had the effect that tasks which blocked indefinitely in the SourceTask::poll method [2] with the expectation that they could stop blocking when SourceTask::stop was invoked would no longer be capable of graceful shutdown, and may even hang forever. This breaking change was introduced in the 3.0.0 release, a little over two three ago. Since then, source connectors may have been modified to adapt to the change in behavior by the Connect framework. As a result, we are hesitant to go back to the prior logic of invoking SourceTask::stop on a separate thread (see the linked Jira ticket for more detail on this front). In https://github.com/apache/kafka/pull/14316, I proposed that we begin interrupting the data processing thread for the source task after it had exhausted its graceful shutdown timeout (i.e., when the Kafka Connect runtime decides to cancel [3], [4], [5] the task). I believe this change is fairly non-controversial--once a task has failed to shut down gracefully, the runtime can and should do whatever it wants to force a shutdown, graceful or otherwise. With all that context out of the way, the question I'd like to ask is: do we believe it's also appropriate to interrupt the data processing thread when the task is scheduled for shutdown [6], [7]? This interruption would ideally be followed up by a graceful shutdown of the task, which may require the Kafka Connect runtime to handle a potential InterruptedException from SourceTask::poll. Other exceptions (such as a wrapped InterruptedException) would be impossible to handle gracefully, and may lead to spurious error messages in the logs and failed final offset commits for connectors that do not work well with this new behavior. Finally, one important note: in the official documentation for SourceTask::poll, we do already state that this method should not block for too long: > If no data is currently available, this method should block but return control to the caller regularly (by returning null) in order for the task to transition to the PAUSED state if requested to do so. Looking forward to everyone's thoughts on this tricky issue! Cheers, Chris [1] - https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#stop() [2] - https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll() [3] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1037 [4] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136 [5] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L284-L297 [6] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1014 [7] - https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L112-L127