fred-ro commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1609600830


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -311,25 +311,35 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                                             Optional<Integer> leaderEpoch,
                                             TimestampType timestampType,
                                             Record record) {
+        long offset = record.offset();
+        long timestamp = record.timestamp();
+        ByteBuffer keyBytes = record.key();
+        ByteBuffer valueBytes = record.value();
+        Headers headers = new RecordHeaders(record.headers());
+        K key;
+        V value;
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-            ByteBuffer valueBytes = record.value();
-            V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-                    timestamp, timestampType,
-                    keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-                    valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-                    key, value, headers, leaderEpoch);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
         } catch (RuntimeException e) {
-            log.error("Deserializers with error: {}", deserializers);
-            throw new RecordDeserializationException(partition, 
record.offset(),
-                    "Error deserializing key/value for partition " + partition 
+
+            throw new 
RecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,

Review Comment:
   Done but with method to create the exception as static code analysis was a 
bit disappointed otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to