C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1149828648
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback return producerCallback; } + @Override + public Set<Map<String, Object>> connectorPartitions(String connectorName) { + return connectorPartitions.getOrDefault(connectorName, Collections.emptySet()); + } + + @SuppressWarnings("unchecked") protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> { if (error != null) { log.error("Failed to read from the offsets topic", error); return; } + if (record.key() != null) { + try { + // The key should always be a list of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + List<Object> keyValue = (List<Object>) keyConverter.toConnectData(topic, record.key()).value(); + String connectorName = (String) keyValue.get(0); + Map<String, Object> partition = (Map<String, Object>) keyValue.get(1); Review Comment: Hmmm... I was envisioning something even more conservative, like what we do for [reading entries from the config topic](https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L967-L980), which gives us friendlier error messages. Also, d'you think it'd be worth it to pull this logic into `ConnectUtils` or somewhere else, since right now it's basically duplicated across the Kafka- and file-backed offset stores? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ########## @@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) { } /** - * Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the - * request to the leader. - */ + * Wait for a {@link FutureCallback} to complete and return the result if successful. + * @param cb the future callback to wait for + * @return the future callback's result if successful + * @param <T> the future's result type + * @throws Throwable if the future callback isn't successful + */ + public <T> T completeRequest(FutureCallback<T> cb) throws Throwable { + try { + return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } catch (TimeoutException e) { + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); + } catch (InterruptedException e) { + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); + } + } Review Comment: Fair enough! I wasn't really thinking about the URL/HTTP verb/etc. parameters but that's a good point. In that case, two suggestions: 1. Can we leverage this variant in all the other appropriate places in `ConnectorsResource` where forwarding isn't necessary but an asynchronous `Herder` API is used? 2. Would it be possible to leverage `completeRequest` inside `completeOrForwardRequest` so that we reduce the duplicated logic for handling timeouts, interruptions, specifying a request timeout, etc.? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,112 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + try { + ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions); + listConsumerGroupOffsetsResult.all().whenComplete((result, error) -> { + if (error != null) { + cb.onCompletion(new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, error), null); + } else { + ConnectorOffsets offsets = SinkUtils.consumerGroupOffsetsToConnectorOffsets(result.get(groupId)); + cb.onCompletion(null, offsets); + } + Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); + }); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); + cb.onCompletion(e, null); Review Comment: This duplicates logic in `AbstractHerder::connectorOffsets`, which is fine (hanging callbacksĀ that never get invoked are a bit of a nightmare to debug and catching twice doesn't make things much harder to read), but maybe it'd be a tad cleaner to put the `try`/`catch` logic in the top-level `Worker::connectorOffsets` method so that we don't have to worry about adding that to each branch of that method depending on the connector's type? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java: ########## @@ -78,6 +85,27 @@ private void load() { ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; data.put(key, value); + if (key != null) { + try { + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + List<Object> keyValue = (List<Object>) keyConverter.toConnectData("", key.array()).value(); + // The key should always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + String connectorName = (String) keyValue.get(0); + Map<String, Object> partition = (Map<String, Object>) keyValue.get(1); + if (!connectorPartitions.containsKey(connectorName)) { + connectorPartitions.put(connectorName, new HashSet<>()); + } + if (value == null) { + connectorPartitions.get(connectorName).remove(partition); Review Comment: Excellent, thanks for the refresher š -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org