C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1152188561


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();
+        if (!(deserializedValue instanceof List)) {

Review Comment:
   Wait, never mind--we already handle that by null-checking the `partitionKey` 
argument. The converter turns nulls into other nulls so anything here for 
`deserializedKey` would be redundant.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java:
##########
@@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> 
offsetData) {
                 throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
         }
     }
+
+    /**
+     * Parses a partition key that is read back from an offset backing store 
and add / remove the partition in the
+     * provided {@code connectorPartitions} map. If the partition key has an 
unexpected format, a warning log is emitted
+     * and nothing is added / removed in the {@code connectorPartitions} map.
+     * @param partitionKey the partition key to be processed
+     * @param offsetValue the offset value corresponding to the partition key; 
determines whether the partition should
+     *                    be added to the {@code connectorPartitions} map or 
removed depending on whether the offset
+     *                    value is null or not.
+     * @param keyConverter the key converter to deserialize the partition key
+     * @param connectorPartitions the map from connector names to its set of 
partitions which needs to be updated after
+     *                            processing
+     */
+    @SuppressWarnings("unchecked")
+    public static void processPartitionKey(byte[] partitionKey, byte[] 
offsetValue, Converter keyConverter,
+                                           Map<String, Set<Map<String, 
Object>>> connectorPartitions) {
+
+        // The key is expected to always be of the form [connectorName, 
partition] where connectorName is a
+        // string value and partition is a Map<String, Object>
+
+        if (partitionKey == null) {
+            log.warn("Ignoring offset partition key with an unexpected null 
value");
+            return;
+        }
+        // The topic parameter is irrelevant for the JsonConverter which is 
the internal converter used by
+        // Connect workers.
+        Object deserializedValue = keyConverter.toConnectData("", 
partitionKey).value();
+        if (!(deserializedValue instanceof List)) {

Review Comment:
   Wait, never mind--we already handle that by null-checking the `partitionKey` 
argument. The converter turns nulls into other nulls so anything here for 
`deserializedKey` would be redundant.
   
   Okay, this is good as-is 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to