Hi all,

I'd like to solicit input from users and maintainers on a problem we've
been dealing with for source task cleanup logic.

If you'd like to pore over some Jira history, here's the primary link:
https://issues.apache.org/jira/browse/KAFKA-15090

To summarize, we accidentally introduced a breaking change for Kafka
Connect in https://github.com/apache/kafka/pull/9669. Before that change,
the SourceTask::stop method [1] would be invoked on a separate thread from
the one that did the actual data processing for the task (polling the task
for records, transforming and converting those records, then sending them
to Kafka). After that change, we began invoking SourceTask::stop on the
same thread that handled data processing for the task. This had the effect
that tasks which blocked indefinitely in the SourceTask::poll method [2]
with the expectation that they could stop blocking when SourceTask::stop
was invoked would no longer be capable of graceful shutdown, and may even
hang forever.

This breaking change was introduced in the 3.0.0 release, a little over two
three ago. Since then, source connectors may have been modified to adapt to
the change in behavior by the Connect framework. As a result, we are
hesitant to go back to the prior logic of invoking SourceTask::stop on a
separate thread (see the linked Jira ticket for more detail on this front).

In https://github.com/apache/kafka/pull/14316, I proposed that we begin
interrupting the data processing thread for the source task after it had
exhausted its graceful shutdown timeout (i.e., when the Kafka Connect
runtime decides to cancel [3], [4], [5] the task). I believe this change is
fairly non-controversial--once a task has failed to shut down gracefully,
the runtime can and should do whatever it wants to force a shutdown,
graceful or otherwise.

With all that context out of the way, the question I'd like to ask is: do
we believe it's also appropriate to interrupt the data processing thread
when the task is scheduled for shutdown [6], [7]? This interruption would
ideally be followed up by a graceful shutdown of the task, which may
require the Kafka Connect runtime to handle a potential
InterruptedException from SourceTask::poll. Other exceptions (such as a
wrapped InterruptedException) would be impossible to handle gracefully, and
may lead to spurious error messages in the logs and failed final offset
commits for connectors that do not work well with this new behavior.

Finally, one important note: in the official documentation for
SourceTask::poll, we do already state that this method should not block for
too long:

> If no data is currently available, this method should block but return
control to the caller regularly (by returning null) in order for the task
to transition to the PAUSED state if requested to do so.

Looking forward to everyone's thoughts on this tricky issue!

Cheers,

Chris

[1] -
https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#stop()
[2] -
https://kafka.apache.org/36/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()
[3] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1037
[4] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136
[5] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L284-L297
[6] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1014
[7] -
https://github.com/apache/kafka/blob/c5ee82cab447b094ad7491200afa319515d5f467/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L112-L127

Reply via email to