C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1152074240


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {

Review Comment:
   Nit: we don't have to run `sinkConnectorOffsets` on a separate thread; can 
we move the use of this executor to only be used in `sourceConnectorOffsets`?
   
   I realize that this may involve undoing the prior suggestion to centralize 
the try/catch logic in `connectorOffsets` since we'd also have to add a 
try/catch block around the body of the `Runnable` we give to the executor, but 
it seems like a worthwhile tradeoff.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+        listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> {
+            if (error != null) {
+                cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);

Review Comment:
   We should also log the error here, for a couple reasons:
   
   1. The request may have timed out (in spite of our best efforts with setting 
a timeout on the call to `Admin::listConsumerGroupOffsets`), so the user would 
never see this error in the response for their REST request
   2. Even if we do manage to send a response to the REST request that includes 
this error, it's useful to have worker-side logs in place that also note this 
failure



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {

Review Comment:
   I think we may want to put this through the tick thread in the 
`DistributedHerder`, to ensure that the connector isn't deleted between when we 
check to see that it exists in the config backing store and when we try to get 
its config:
   
   ```java
   @Override
   public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) 
{
       log.trace("Submitting offset fetch request for connector: {}", connName);
       // Handle this in the tick thread to ensure the connector isn't modified 
concurrently
       addRequest(
               () -> {
                   super.connectorOffsets(connName, cb);
                   return null;
               },
               forwardErrorCallback(cb)
       );
   }
   ```
   
   This also includes a log message since the other ones we emit in the 
`DistributedHerder` class take place before we enqueue the herder request.
   
   We might actually want to tweak the message in the `AbstractHerder` class to 
not use the "Submitting... request" language since it's more like we're 
actually fulfilling the request at that point.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);

Review Comment:
   Nit: these are all `TRACE` level in `DistributedHerder` (e.g., 
[here](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L825),
 
[here](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L839),
 etc.)
   
   ```suggestion
           log.trace("Submitting offset fetch request for connector: {}", 
connName);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);

Review Comment:
   Nit: we don't need to predeclare the `connector` variable:
   
   ```suggestion
   
               try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
                   Connector connector = 
plugins.newConnector(connectorClassOrAlias);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   Does this leak the admin client if the remainder of this method throws an 
exception?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);

Review Comment:
   Should this be `SINK`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+        listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> {
+            if (error != null) {
+                cb.onCompletion(new ConnectException("Failed to retrieve 
consumer group offsets for sink connector " + connName, error), null);
+            } else {
+                ConnectorOffsets offsets = 
SinkUtils.consumerGroupOffsetsToConnectorOffsets(result.get(groupId));
+                cb.onCompletion(null, offsets);
+            }
+            Utils.closeQuietly(admin, "Offset fetch admin for sink connector " 
+ connName);
+        });
+    }
+
+    /**
+     * Get the current offsets for a source connector.
+     * @param connName the name of the source connector whose offsets are to 
be retrieved
+     * @param connector the source connector
+     * @param connectorConfig the source connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                        Callback<ConnectorOffsets> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector);
+        CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        sourceConnectorOffsets(connName, offsetStore, offsetReader, cb);
+    }
+
+    // Visible for testing
+    void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore 
offsetStore,
+                                CloseableOffsetStorageReader offsetReader, 
Callback<ConnectorOffsets> cb) {
+        offsetStore.configure(config);

Review Comment:
   If this call completes, the store may have [instantiated a 
`KafkaBasedLog`](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L231);
 we should probably move the following like (`offsetStore.start();`) inside the 
`try` block.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();
+        if (!(deserializedValue instanceof List)) {

Review Comment:
   I think explicitly declaring that the value is null is the best thing here. 
It's probably obvious to both of us what that log line would mean now, but 
we've already looked at this code. If a user sees this they may not know, or at 
least doubt, what this means and waste time locating this log message in the 
code base to check what it means.
   
   The issue with `NullType` is that this message would still be misleading; 
sure, in a theoretical sense, `null` is a type, but it's most-commonly used as 
a value, and people seeing errors about actual vs. expected type could be 
confused if they're only familiar with the latter usage.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector. This method is asynchronous 
and the passed callback is completed when the
+     * request finishes processing.
+     *
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    public void connectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<ConnectorOffsets> cb) {
+        executor.submit(() -> {
+            String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+            Connector connector;
+
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                connector = plugins.newConnector(connectorClassOrAlias);
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    log.debug("Fetching offsets for sink connector: {}", 
connName);
+                    sinkConnectorOffsets(connName, connector, connectorConfig, 
cb);
+                } else {
+                    log.debug("Fetching offsets for source connector: {}", 
connName);
+                    sourceConnectorOffsets(connName, connector, 
connectorConfig, cb);
+                }
+            } catch (Exception e) {
+                cb.onCompletion(e, null);
+            }
+        });
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @param cb callback to invoke upon completion of the request
+     */
+    private void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Callback<ConnectorOffsets> cb) {
+        sinkConnectorOffsets(connName, connector, connectorConfig, cb, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    void sinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                              Callback<ConnectorOffsets> cb, 
Function<Map<String, Object>, Admin> adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
+                .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
+        listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> {

Review Comment:
   Nit: would 
[partitionsToOffsetAndMetadata](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.html#partitionsToOffsetAndMetadata())
   work better than `all` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to