C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1146391973


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, 
connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> 
adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId);

Review Comment:
   I don't think we need to worry too much about this. I cannot imagine a sane 
use case that involves overriding a connector's Kafka clients with different 
Kafka clusters (not just bootstrap servers, but actually different clusters) 
for producer/consumer/admin. I'd be fine with adding a note to our docs that 
that kind of setup isn't supported but I really, really hope that it's not 
necessary and nobody's trying to do that in the first place.
   
   That said, there is a different case we may want to consider: someone may 
have configured consumer overrides for a sink connector, but not admin 
overrides. This may happen if they don't use a DLQ topic. I don't know if we 
absolutely need to handle this now and we may consider filing a follow-up 
ticket to look into this, but one quick-and-dirty thought I've had is to 
configure the admin client used here with a combination of the configurations 
for the connector's admin client and its consumer, giving precedent to the 
latter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to