yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1150173556


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> 
values, final Callback
         return producerCallback;
     }
 
+    @Override
+    public Set<Map<String, Object>> connectorPartitions(String connectorName) {
+        return connectorPartitions.getOrDefault(connectorName, 
Collections.emptySet());
+    }
+
+    @SuppressWarnings("unchecked")
     protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
         if (error != null) {
             log.error("Failed to read from the offsets topic", error);
             return;
         }
 
+        if (record.key() != null) {
+            try {
+                // The key should always be a list of the form [connectorName, 
partition] where connectorName is a
+                // string value and partition is a Map<String, Object>
+                List<Object> keyValue = (List<Object>) 
keyConverter.toConnectData(topic, record.key()).value();
+                String connectorName = (String) keyValue.get(0);
+                Map<String, Object> partition = (Map<String, Object>) 
keyValue.get(1);

Review Comment:
   Thanks, that's a good example. I've refactored this to have more fine 
grained error messages and moved it to `OffsetUtils` for re-use (also added a 
few simple tests while I was at it).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);
+        connectorExecutor.submit(() -> {

Review Comment:
   Hm I just realized that this isn't really similar to `setTargetState` or 
`startConnector` (from original suggestion 
[here](https://github.com/apache/kafka/pull/13434#discussion_r1147779547)) - in 
those two methods we're using callbacks to add a followup request once the 
original request completes whereas here we're using the callback to actually 
return the requested information in the request. So there's actually no 
precedent for handling requests like this in the worker since all other 
requests are handled on the herder's thread and the only executor in the 
`Worker` is the one which runs the connector and task threads themselves; I'm 
not sure we'd want to tack on requests like this to that executor as well. What 
do you think?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##########
@@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) {
     }
 
     /**
-     * Wait for a FutureCallback to complete. If it succeeds, return the 
parsed response. If it fails, try to forward the
-     * request to the leader.
-      */
+     * Wait for a {@link FutureCallback} to complete and return the result if 
successful.
+     * @param cb the future callback to wait for
+     * @return the future callback's result if successful
+     * @param <T> the future's result type
+     * @throws Throwable if the future callback isn't successful
+     */
+    public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
+        try {
+            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout 
error codes are relevant, so internal server
+            // error is the best option
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request timed out");
+        } catch (InterruptedException e) {
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request interrupted");
+        }
+    }

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to