yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1151375965


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,19 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);
+        connectorExecutor.submit(() -> {

Review Comment:
   Thanks for the detailed reply!
   
   > With regards to using up threads threads on the Worker class's executor, 
you've correctly noted that this one is responsible for running Connector and 
Task instances. However, that executor doesn't have a bound on the number of 
threads it allocates, so we don't have to worry about tying it up and 
preventing it from being used for other purposes. It's possible that an 
excessive number of offset reset requests might cause problems for the whole 
JVM process by making too many calls to the executor and spinning up too many 
new threads, but that's equally possible if we use the AbstractHerder class's 
connectorExecutor, since that too uses an unbounded number of threads
   
   Yeah, that's right but my concern wasn't about resource contention between 
connector / task threads and offset read requests but more so about logical 
separation of responsibilities. I considered adding a new unbounded cached 
thread pool `requestExecutor` to the `Worker` to handle offset read requests 
but it doesn't really offer much other than a separate name and comes with the 
additional burden of having to manage the new executor's lifecycle (closing 
cleanly on worker termination etc.). Also, I just noticed that the existing 
worker executor is not exclusively used to run connector and task threads, but 
it's also passed to worker source tasks to close producers on a separate thread 
during task cancellation. So, I've elected to just simply re-use the existing 
`executor` (which is anyway generically named) in the `Worker` for servicing 
the async offset get request as well (and we now no longer make use of the 
executor in the `AbstractHerder`). Let me know what you think!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();
+        if (!(deserializedValue instanceof List)) {

Review Comment:
   Hm yeah, the intention was to handle it through the `instanceof` check 
itself and let the `className` method handle `null` appropriately. `type: null` 
doesn't sound too bad to me personally, since `null` is [technically a 
type](https://docs.oracle.com/javase/specs/jls/se8/html/jls-4.html#jls-4.1) 
with only one possible value. Would you prefer using 
[NullType](https://docs.oracle.com/javase/8/docs/api/javax/lang/model/type/NullType.html)
 instead?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##########
@@ -66,17 +87,15 @@ public <T, U> T completeOrForwardRequest(FutureCallback<T> 
cb,
                                                     Translator<T, U> 
translator,
                                                     Boolean forward) throws 
Throwable {
         try {
-            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-
-            if (cause instanceof RequestTargetException) {
+            return completeRequest(cb);
+        } catch (Exception e) {
+            if (e instanceof RequestTargetException) {

Review Comment:
   Oops, of course. My bad, I guess I was paying too much attention to keep 
things exactly as they were while porting over the changes from 
`completeRequest` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to