C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1151059049
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); + connectorExecutor.submit(() -> { Review Comment: I really like the cleanliness that we get from using the admin client's `Future`-based API but it's not the end of the world if we don't. IMO there's enough of a precedent for asynchronous APIs in the `Worker` class (especially since the `startConnector` method does indirectly end up fulfilling a `Callback` passed to the `Herder` by the `ConnectorsResource` class, which is what we'd be doing here as well) that it's not going to make the code base harder to read on its own. But you are right that the existing use cases for an asynchronous API in the `Worker` class are for chaining together bits of herder logic with (possibly-blocking) calls to connector code in between, so this case is a little different. With regards to using up threads threads on the `Worker` class's executor, you've correctly noted that this one is responsible for running `Connector` and `Task` instances. However, that executor doesn't have a bound on the number of threads it allocates, so we don't have to worry about tying it up and preventing it from being used for other purposes. It's possible that an excessive number of offset reset requests might cause problems for the whole JVM process by making too many calls to the executor and spinning up too many new threads, but that's equally possible if we use the `AbstractHerder` class's `connectorExecutor`, since that too uses an unbounded number of threads. So in user-facing terms, I don't believe there's a significant difference (though maybe the asynchronous API is a bit better for sink connectors since it spins up one less thread--not a huge deal). We might consider using a different executor altogether and putting a bound on the number of threads that can be allocated for it, but since we invoke connector methods when altering or resetting offsets, there's always the possibility that that invocation hangs, and if a user repeatedly issues the same request after getting a timeout error, that could easily lock up the entire executor. This is also why we chose to use an unbounded number of threads when addressing [KAFKA-9374](https://issues.apache.org/jira/browse/KAFKA-9374) (the "hung connector" bug). TL;DR: Do whatever you think is cleanest, don't worry too much about precedents in the code base, and it'll probably be fine 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org