C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1149813420


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);
+        connectorExecutor.submit(() -> {

Review Comment:
   Now that the `Worker` API is callback based, can we move any dispatching of 
tasks to executors (or different threads in general) to there and invoke 
`Worker::connectorOffsets` directly in the calling thread for this method?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }

Review Comment:
   It's a bit of an edge case, but the connector's Kafka clients may be 
configured with some kind of pluggable interface (maybe a metrics reporter?), 
in which case we'd want to try to load that from the connector's plugin 
directory first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to