C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1152188561
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java: ########## @@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> offsetData) { throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); } } + + /** + * Parses a partition key that is read back from an offset backing store and add / remove the partition in the + * provided {@code connectorPartitions} map. If the partition key has an unexpected format, a warning log is emitted + * and nothing is added / removed in the {@code connectorPartitions} map. + * @param partitionKey the partition key to be processed + * @param offsetValue the offset value corresponding to the partition key; determines whether the partition should + * be added to the {@code connectorPartitions} map or removed depending on whether the offset + * value is null or not. + * @param keyConverter the key converter to deserialize the partition key + * @param connectorPartitions the map from connector names to its set of partitions which needs to be updated after + * processing + */ + @SuppressWarnings("unchecked") + public static void processPartitionKey(byte[] partitionKey, byte[] offsetValue, Converter keyConverter, + Map<String, Set<Map<String, Object>>> connectorPartitions) { + + // The key is expected to always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + + if (partitionKey == null) { + log.warn("Ignoring offset partition key with an unexpected null value"); + return; + } + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + Object deserializedValue = keyConverter.toConnectData("", partitionKey).value(); + if (!(deserializedValue instanceof List)) { Review Comment: Wait, never mind--we already handle that by null-checking the `partitionKey` argument. The converter turns nulls into other nulls so anything here for `deserializedKey` would be redundant. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java: ########## @@ -53,4 +63,64 @@ public static <K, V> void validateFormat(Map<K, V> offsetData) { throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); } } + + /** + * Parses a partition key that is read back from an offset backing store and add / remove the partition in the + * provided {@code connectorPartitions} map. If the partition key has an unexpected format, a warning log is emitted + * and nothing is added / removed in the {@code connectorPartitions} map. + * @param partitionKey the partition key to be processed + * @param offsetValue the offset value corresponding to the partition key; determines whether the partition should + * be added to the {@code connectorPartitions} map or removed depending on whether the offset + * value is null or not. + * @param keyConverter the key converter to deserialize the partition key + * @param connectorPartitions the map from connector names to its set of partitions which needs to be updated after + * processing + */ + @SuppressWarnings("unchecked") + public static void processPartitionKey(byte[] partitionKey, byte[] offsetValue, Converter keyConverter, + Map<String, Set<Map<String, Object>>> connectorPartitions) { + + // The key is expected to always be of the form [connectorName, partition] where connectorName is a + // string value and partition is a Map<String, Object> + + if (partitionKey == null) { + log.warn("Ignoring offset partition key with an unexpected null value"); + return; + } + // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by + // Connect workers. + Object deserializedValue = keyConverter.toConnectData("", partitionKey).value(); + if (!(deserializedValue instanceof List)) { Review Comment: Wait, never mind--we already handle that by null-checking the `partitionKey` argument. The converter turns nulls into other nulls so anything here for `deserializedKey` would be redundant. Okay, this is good as-is 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org