yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1151375965
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { + log.debug("Submitting offset fetch request for connector: {}", connName); + connectorExecutor.submit(() -> { Review Comment: Thanks for the detailed reply! > With regards to using up threads threads on the Worker class's executor, you've correctly noted that this one is responsible for running Connector and Task instances. However, that executor doesn't have a bound on the number of threads it allocates, so we don't have to worry about tying it up and preventing it from being used for other purposes. It's possible that an excessive number of offset reset requests might cause problems for the whole JVM process by making too many calls to the executor and spinning up too many new threads, but that's equally possible if we use the AbstractHerder class's connectorExecutor, since that too uses an unbounded number of threads Yeah, that's right but my concern wasn't about resource contention between connector / task threads and offset read requests but more so about logical separation of responsibilities. I considered adding a new unbounded cached thread pool `requestExecutor` to the `Worker` to handle offset read requests but it doesn't really offer much other than a separate name and comes with the additional burden of having to manage the new executor's lifecycle (closing cleanly on worker termination etc.). Also, I just noticed that the existing worker executor is not exclusively used to run connector and task threads, but it's also passed to worker source tasks to close producers on a separate thread during task cancellation. So, I've elected to just simply re-use the existing `executor` (which is anyway generically named) in the `Worker` for servicing the async offset get request as well (and we now no longer make use of the executor in the `AbstractHerder`). Let me know what you think! ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java: ########## @@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> offsetData) { throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); } } + + /** + * Parses a partition key that is read back from an offset backing store and add / remove the partition in the + * provided {@code connectorPartitions} map. If the partition key has an unexpected format, a warning log is emitted + * and nothing is added / removed in the {@code connectorPartitions} map. + * @param partitionKey the partition key to be processed + * @param offsetValue the offset value corresponding to the partition key; determines whether the partition should + * be added to the {@code connectorPartitions} map or removed depending on whether the offset + * value is null or not. + * @param keyConverter the key converter to deserialize the partition key + * @param connectorPartitions the map from connector names to its set of partitions which needs to be updated after + * processing + */ + @SuppressWarnings("unchecked") + public static void processPartitionKey(byte[] partitionKey, byte[] offsetValue, Converter keyConverter, + Map<String, Set<Map<String, Object>>> connectorPartitions) { + + // The key is expected to always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + + if (partitionKey == null) { + log.warn("Ignoring offset partition key with an unexpected null value"); + return; + } + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + Object deserializedValue = keyConverter.toConnectData("", partitionKey).value(); + if (!(deserializedValue instanceof List)) { Review Comment: Hm yeah, the intention was to handle it through the `instanceof` check itself and let the `className` method handle `null` appropriately. `type: null` doesn't sound too bad to me personally, since `null` is [technically a type](https://docs.oracle.com/javase/specs/jls/se8/html/jls-4.html#jls-4.1) with only one possible value. Would you prefer using [NullType](https://docs.oracle.com/javase/8/docs/api/javax/lang/model/type/NullType.html) instead? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ########## @@ -66,17 +87,15 @@ public <T, U> T completeOrForwardRequest(FutureCallback<T> cb, Translator<T, U> translator, Boolean forward) throws Throwable { try { - return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - - if (cause instanceof RequestTargetException) { + return completeRequest(cb); + } catch (Exception e) { + if (e instanceof RequestTargetException) { Review Comment: Oops, of course. My bad, I guess I was paying too much attention to keep things exactly as they were while porting over the changes from `completeRequest` . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org