C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1151062947


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##########
@@ -66,17 +87,15 @@ public <T, U> T completeOrForwardRequest(FutureCallback<T> 
cb,
                                                     Translator<T, U> 
translator,
                                                     Boolean forward) throws 
Throwable {
         try {
-            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-
-            if (cause instanceof RequestTargetException) {
+            return completeRequest(cb);
+        } catch (Exception e) {
+            if (e instanceof RequestTargetException) {

Review Comment:
   Can we replace the `catch (Exception e)` and following `if (e instanceof 
...)` blocks with dedicated `catch` blocks for each exception type?
   
   E.g.:
   
   ```java
   try {
       return completeRequest(cb);
   } catch (RequestTargetException e) {
       if (forward == null || forward) {
           // ...
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();

Review Comment:
   Should this be `deserializedKey`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -209,4 +209,13 @@ public static String clientIdBase(WorkerConfig config) {
         }
         return result + "-";
     }
+
+    /**
+     * Get the class name for an object in a null-safe manner.
+     * @param o the object whose class name is to be returned
+     * @return "null" if the object is null; or else the object's class name
+     */
+    public static String className(Object o) {

Review Comment:
   Just a general remark, feel free to put non-Connect-specific stuff like this 
in the general-purpose [Utils 
class|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java]
 instead of `ConnectUtils`. Don't have to do that for this PR but it's nice to 
keep in mind (especially since there's some great stuff in `Utils` that's easy 
to miss if you aren't in the neighborhood often).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class OffsetUtilsTest {

Review Comment:
   I love this entire test suite, great attention to detail!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();
+        if (!(deserializedValue instanceof List)) {

Review Comment:
   Worth adding logic to explicitly handle null? I know that it'll be 
implicitly handled by this condition but the message `Expected type: 
java.util.List, actual type: null` kind of looks like an error in the logging 
logic and may confuse users if they see that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to