C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1152074240
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { Review Comment: Nit: we don't have to run `sinkConnectorOffsets` on a separate thread; can we move the use of this executor to only be used in `sourceConnectorOffsets`? I realize that this may involve undoing the prior suggestion to centralize the try/catch logic in `connectorOffsets` since we'd also have to add a try/catch block around the body of the `Runnable` we give to the executor, but it seems like a worthwhile tradeoff. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); + listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> { + if (error != null) { + cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, error), null); Review Comment: We should also log the error here, for a couple reasons: 1. The request may have timed out (in spite of our best efforts with setting a timeout on the call to `Admin::listConsumerGroupOffsets`), so the user would never see this error in the response for their REST request 2. Even if we do manage to send a response to the REST request that includes this error, it's useful to have worker-side logs in place that also note this failure ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { Review Comment: I think we may want to put this through the tick thread in the `DistributedHerder`, to ensure that the connector isn't deleted between when we check to see that it exists in the config backing store and when we try to get its config: ```java @Override public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { log.trace("Submitting offset fetch request for connector: {}", connName); // Handle this in the tick thread to ensure the connector isn't modified concurrently addRequest( () -> { super.connectorOffsets(connName, cb); return null; }, forwardErrorCallback(cb) ); } ``` This also includes a log message since the other ones we emit in the `DistributedHerder` class take place before we enqueue the herder request. We might actually want to tweak the message in the `AbstractHerder` class to not use the "Submitting... request" language since it's more like we're actually fulfilling the request at that point. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); Review Comment: Nit: these are all `TRACE` level in `DistributedHerder` (e.g., [here](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L825), [here](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L839), etc.) ```suggestion log.trace("Submitting offset fetch request for connector: {}", connName); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); Review Comment: Nit: we don't need to predeclare the `connector` variable: ```suggestion try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { Connector connector = plugins.newConnector(connectorClassOrAlias); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); Review Comment: Does this leak the admin client if the remainder of this method throws an exception? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); Review Comment: Should this be `SINK`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); + listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> { + if (error != null) { + cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, error), null); + } else { + ConnectorOffsets offsets = SinkUtils.consumerGroupOffsetsToConnectorOffsets(result.get(groupId)); + cb.onCompletion(null, offsets); + } + Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); + }); + } + + /** + * Get the current offsets for a source connector. + * @param connName the name of the source connector whose offsets are to be retrieved + * @param connector the source connector + * @param connectorConfig the source connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); + ConnectorOffsetBackingStore offsetStore = config.exactlyOnceSourceEnabled() + ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector) + : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector); + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter); + sourceConnectorOffsets(connName, offsetStore, offsetReader, cb); + } + + // Visible for testing + void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetStore, + CloseableOffsetStorageReader offsetReader, Callback<ConnectorOffsets> cb) { + offsetStore.configure(config); Review Comment: If this call completes, the store may have [instantiated a `KafkaBasedLog`](https://github.com/apache/kafka/blob/6e8d0d9850b05fc1de0ceaf77834e68939f782c1/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L231); we should probably move the following like (`offsetStore.start();`) inside the `try` block. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java: ########## @@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> offsetData) { throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); } } + + /** + * Parses a partition key that is read back from an offset backing store and add / remove the partition in the + * provided {@code connectorPartitions} map. If the partition key has an unexpected format, a warning log is emitted + * and nothing is added / removed in the {@code connectorPartitions} map. + * @param partitionKey the partition key to be processed + * @param offsetValue the offset value corresponding to the partition key; determines whether the partition should + * be added to the {@code connectorPartitions} map or removed depending on whether the offset + * value is null or not. + * @param keyConverter the key converter to deserialize the partition key + * @param connectorPartitions the map from connector names to its set of partitions which needs to be updated after + * processing + */ + @SuppressWarnings("unchecked") + public static void processPartitionKey(byte[] partitionKey, byte[] offsetValue, Converter keyConverter, + Map<String, Set<Map<String, Object>>> connectorPartitions) { + + // The key is expected to always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + + if (partitionKey == null) { + log.warn("Ignoring offset partition key with an unexpected null value"); + return; + } + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + Object deserializedValue = keyConverter.toConnectData("", partitionKey).value(); + if (!(deserializedValue instanceof List)) { Review Comment: I think explicitly declaring that the value is null is the best thing here. It's probably obvious to both of us what that log line would mean now, but we've already looked at this code. If a user sees this they may not know, or at least doubt, what this means and waste time locating this log message in the code base to check what it means. The issue with `NullType` is that this message would still be misleading; sure, in a theoretical sense, `null` is a type, but it's most-commonly used as a value, and people seeing errors about actual vs. expected type could be confused if they're only familiar with the latter usage. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); + listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> { Review Comment: Nit: would [partitionsToOffsetAndMetadata](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.html#partitionsToOffsetAndMetadata()) work better than `all` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org