yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1150173556
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback return producerCallback; } + @Override + public Set<Map<String, Object>> connectorPartitions(String connectorName) { + return connectorPartitions.getOrDefault(connectorName, Collections.emptySet()); + } + + @SuppressWarnings("unchecked") protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> { if (error != null) { log.error("Failed to read from the offsets topic", error); return; } + if (record.key() != null) { + try { + // The key should always be a list of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + List<Object> keyValue = (List<Object>) keyConverter.toConnectData(topic, record.key()).value(); + String connectorName = (String) keyValue.get(0); + Map<String, Object> partition = (Map<String, Object>) keyValue.get(1); Review Comment: Thanks, that's a good example. I've refactored this to have more fine grained error messages and moved it to `OffsetUtils` for re-use (also added a few simple tests while I was at it). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); + connectorExecutor.submit(() -> { Review Comment: Hm I just realized that this isn't really similar to `setTargetState` or `startConnector` (from original suggestion [here](https://github.com/apache/kafka/pull/13434#discussion_r1147779547)) - in those two methods we're using callbacks to add a followup request once the original request completes whereas here we're using the callback to actually return the requested information in the request. So there's actually no precedent for handling requests like this in the worker since all other requests are handled on the herder's thread and the only executor in the `Worker` is the one which runs the connector and task threads themselves; I'm not sure we'd want to tack on requests like this to that executor as well. What do you think? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ########## @@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) { } /** - * Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the - * request to the leader. - */ + * Wait for a {@link FutureCallback} to complete and return the result if successful. + * @param cb the future callback to wait for + * @return the future callback's result if successful + * @param <T> the future's result type + * @throws Throwable if the future callback isn't successful + */ + public <T> T completeRequest(FutureCallback<T> cb) throws Throwable { + try { + return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } catch (TimeoutException e) { + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); + } catch (InterruptedException e) { + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); + } + } Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org