C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1151059049


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);
+        connectorExecutor.submit(() -> {

Review Comment:
   I really like the cleanliness that we get from using the admin client's 
`Future`-based API but it's not the end of the world if we don't. IMO there's 
enough of a precedent for asynchronous APIs in the `Worker` class (especially 
since the `startConnector` method does indirectly end up fulfilling a 
`Callback` passed to the `Herder` by the `ConnectorsResource` class, which is 
what we'd be doing here as well) that it's not going to make the code base 
harder to read on its own. But you are right that the existing use cases for an 
asynchronous API in the `Worker` class are for chaining together bits of herder 
logic with (possibly-blocking) calls to connector code in between, so this case 
is a little different.
   
   With regards to using up threads threads on the `Worker` class's executor, 
you've correctly noted that this one is responsible for running `Connector` and 
`Task` instances. However, that executor doesn't have a bound on the number of 
threads it allocates, so we don't have to worry about tying it up and 
preventing it from being used for other purposes. It's possible that an 
excessive number of offset reset requests might cause problems for the whole 
JVM process by making too many calls to the executor and spinning up too many 
new threads, but that's equally possible if we use the `AbstractHerder` class's 
`connectorExecutor`, since that too uses an unbounded number of threads. So in 
user-facing terms, I don't believe there's a significant difference (though 
maybe the asynchronous API is a bit better for sink connectors since it spins 
up one less thread--not a huge deal).
   
   We might consider using a different executor altogether and putting a bound 
on the number of threads that can be allocated for it, but since we invoke 
connector methods when altering or resetting offsets, there's always the 
possibility that that invocation hangs, and if a user repeatedly issues the 
same request after getting a timeout error, that could easily lock up the 
entire executor. This is also why we chose to use an unbounded number of 
threads when addressing 
[KAFKA-9374](https://issues.apache.org/jira/browse/KAFKA-9374) (the "hung 
connector" bug).
   
   TL;DR: Do whatever you think is cleanest, don't worry too much about 
precedents in the code base, and it'll probably be fine 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to