C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1149813420
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); + connectorExecutor.submit(() -> { Review Comment: Now that the `Worker` API is callback based, can we move any dispatching of tasks to executors (or different threads in general) to there and invoke `Worker::connectorOffsets` directly in the calling thread for this method? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } Review Comment: It's a bit of an edge case, but the connector's Kafka clients may be configured with some kind of pluggable interface (maybe a metrics reporter?), in which case we'd want to try to load that from the connector's plugin directory first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org