C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1047862099
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1645,6 +1646,8 @@ private void startAndStop(Collection<Callable<Void>>
callables) {
startAndStopExecutor.invokeAll(callables);
} catch (InterruptedException e) {
// ignore
+ } catch (RejectedExecutionException re) {
+ log.error("startAndStopExecutor already shutdown or full. Not
invoking explicit connector/task shutdown");
Review Comment:
Thank you for looking into this. It's these kind of details that make
maintaining this part of the code base tricky, and I really appreciate the time
you've put into diving deep on this one!
I've revisited some of the logic here and it seems like the assumption that
the cleanup in `halt` (where we invoke, among other things, `member::stop`)
will be allowed to proceed cleanly if we just handle a
`RejectedExecutionException` being thrown in `startAndStop` is not quite
accurate.
The only way to externally trigger worker shutdown is to begin to terminate
the JVM (usually done via `SIGINT`, or ctrl+C). At this point, threads are not
given time to gracefully terminate (even if they are non-daemon threads);
instead, the only thing that's guaranteed to take place before the JVM exiting
is the execution of any shutdown hooks that have been added. In this case, we
add a [shutdown hook in the `Connect`
class](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L95-L105)
that [shuts down the REST server and invokes
`Herder::stop`](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L68-L69).
So, even if we catch a `RejectedExecutionException` and handle that
gracefully, there's still a good chance that the JVM will terminate before we
can successfully invoke (among other things) `Worker::stop`.
With that in mind, here's my proposal:
1. Modify the shutdown logic in `halt` to use
`Worker::stopAndAwaitConnectors` and `Worker::stopAndAwaitTasks` (the no-args
variants of each), instead of using `startAndStop` at all. This will make it
easier to reason about how long graceful shutdown should take, and reduce the
risk of a `rejectedExecutionException` being thrown
2. Increase the timeout for `herderExecutor::awaitTermination` to a value
large enough to accommodate a read to the end of the config topic (which is
currently bounded by `workerSyncTimeoutMs`), plus successfully attempting to
stop all connectors and tasks (which should now be bounded by 5 seconds (the
hardcoded graceful shutdown timeout for connectors) plus
`workerTasksShutdownTimeoutMs`)
3. Add a `catch` clause to the body of `startAndStop` that does swallow a
`RejectedExecutionException` and just log a `DEBUG`-level message in that case,
but only if we're already in the process of shutdown (i.e., `stopping.get()` is
`true`), and throws the exception otherwise
The idea here is to still attempt to gracefully shut down the worker, but
also handle the edge case where we time out while trying to do so and the
herder tick thread attempts to access a now-closed resource.
Do you think this addresses the issue sufficiently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]