C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1149828648


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> 
values, final Callback
         return producerCallback;
     }
 
+    @Override
+    public Set<Map<String, Object>> connectorPartitions(String connectorName) {
+        return connectorPartitions.getOrDefault(connectorName, 
Collections.emptySet());
+    }
+
+    @SuppressWarnings("unchecked")
     protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
         if (error != null) {
             log.error("Failed to read from the offsets topic", error);
             return;
         }
 
+        if (record.key() != null) {
+            try {
+                // The key should always be a list of the form [connectorName, 
partition] where connectorName is a
+                // string value and partition is a Map<String, Object>
+                List<Object> keyValue = (List<Object>) 
keyConverter.toConnectData(topic, record.key()).value();
+                String connectorName = (String) keyValue.get(0);
+                Map<String, Object> partition = (Map<String, Object>) 
keyValue.get(1);

Review Comment:
   Hmmm... I was envisioning something even more conservative, like what we do 
for [reading entries from the config 
topic](https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L967-L980),
 which gives us friendlier error messages.
   
   Also, d'you think it'd be worth it to pull this logic into `ConnectUtils` or 
somewhere else, since right now it's basically duplicated across the Kafka- and 
file-backed offset stores?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##########
@@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) {
     }
 
     /**
-     * Wait for a FutureCallback to complete. If it succeeds, return the 
parsed response. If it fails, try to forward the
-     * request to the leader.
-      */
+     * Wait for a {@link FutureCallback} to complete and return the result if 
successful.
+     * @param cb the future callback to wait for
+     * @return the future callback's result if successful
+     * @param <T> the future's result type
+     * @throws Throwable if the future callback isn't successful
+     */
+    public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
+        try {
+            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout 
error codes are relevant, so internal server
+            // error is the best option
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request timed out");
+        } catch (InterruptedException e) {
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request interrupted");
+        }
+    }

Review Comment:
   Fair enough! I wasn't really thinking about the URL/HTTP verb/etc. 
parameters but that's a good point.
   
   In that case, two suggestions:
   
   1. Can we leverage this variant in all the other appropriate places in 
`ConnectorsResource` where forwarding isn't necessary but an asynchronous 
`Herder` API is used?
   2. Would it be possible to leverage `completeRequest` inside 
`completeOrForwardRequest` so that we reduce the duplicated logic for handling 
timeouts, interruptions, specifying a request timeout, etc.?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,112 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Fetching offsets for sink connector: {}", connName);
+                sinkConnectorOffsets(connName, connector, connectorConfig, cb);
+            } else {
+                log.debug("Fetching offsets for source connector: {}", 
connName);
+                sourceConnectorOffsets(connName, connector, connectorConfig, 
cb);
+            }
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                    .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+            listConsumerGroupOffsetsResult.all().whenComplete((result, error) 
-> {
+                if (error != null) {
+                    cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);
+                } else {
+                    ConnectorOffsets offsets = 
SinkUtils.consumerGroupOffsetsToConnectorOffsets(result.get(groupId));
+                    cb.onCompletion(null, offsets);
+                }
+                Utils.closeQuietly(admin, "Offset fetch admin for sink 
connector " + connName);
+            });
+        } catch (Exception e) {
+            Utils.closeQuietly(admin, "Offset fetch admin for sink connector " 
+ connName);
+            cb.onCompletion(e, null);

Review Comment:
   This duplicates logic in `AbstractHerder::connectorOffsets`, which is fine 
(hanging callbacksĀ that never get invoked are a bit of a nightmare to debug and 
catching twice doesn't make things much harder to read), but maybe it'd be a 
tad cleaner to put the `try`/`catch` logic in the top-level 
`Worker::connectorOffsets` method so that we don't have to worry about adding 
that to each branch of that method depending on the connector's type?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java:
##########
@@ -78,6 +85,27 @@ private void load() {
                 ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
                 ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
                 data.put(key, value);
+                if (key != null) {
+                    try {
+                        // The topic parameter is irrelevant for the 
JsonConverter which is the internal converter used by
+                        // Connect workers.
+                        List<Object> keyValue = (List<Object>) 
keyConverter.toConnectData("", key.array()).value();
+                        // The key should always be of the form 
[connectorName, partition] where connectorName is a
+                        // string value and partition is a Map<String, Object>
+                        String connectorName = (String) keyValue.get(0);
+                        Map<String, Object> partition = (Map<String, Object>) 
keyValue.get(1);
+                        if (!connectorPartitions.containsKey(connectorName)) {
+                            connectorPartitions.put(connectorName, new 
HashSet<>());
+                        }
+                        if (value == null) {
+                            
connectorPartitions.get(connectorName).remove(partition);

Review Comment:
   Excellent, thanks for the refresher šŸ™



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to