[ 
https://issues.apache.org/jira/browse/KAFKA-7036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-7036.
-----------------------------------
    Resolution: Won't Fix

> Complete the docs of KafkaConsumer#poll
> ---------------------------------------
>
>                 Key: KAFKA-7036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7036
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Minor
>
> KafkaConsumer#poll has a nice docs about the expected exceptions. However, it 
> lacks the description of SerializationException. Another mirror issue is that 
> KafkaConsumer doesn't catch all type of exception which may be thrown by 
> deserializer (see below). We should use Throwable to replace the 
> RuntimeException so as to catch all exception and then wrap them to 
> SerializationException.
> {code:java}
> private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
>                                          RecordBatch batch,
>                                          Record record) {
>     try {
>         long offset = record.offset();
>         long timestamp = record.timestamp();
>         TimestampType timestampType = batch.timestampType();
>         Headers headers = new RecordHeaders(record.headers());
>         ByteBuffer keyBytes = record.key();
>         byte[] keyByteArray = keyBytes == null ? null : 
> Utils.toArray(keyBytes);
>         K key = keyBytes == null ? null : 
> this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
>         ByteBuffer valueBytes = record.value();
>         byte[] valueByteArray = valueBytes == null ? null : 
> Utils.toArray(valueBytes);
>         V value = valueBytes == null ? null : 
> this.valueDeserializer.deserialize(partition.topic(), headers, 
> valueByteArray);
>         return new ConsumerRecord<>(partition.topic(), partition.partition(), 
> offset,
>                                     timestamp, timestampType, 
> record.checksumOrNull(),
>                                     keyByteArray == null ? 
> ConsumerRecord.NULL_SIZE : keyByteArray.length,
>                                     valueByteArray == null ? 
> ConsumerRecord.NULL_SIZE : valueByteArray.length,
>                                     key, value, headers);
>     } catch (RuntimeException e) {
>         throw new SerializationException("Error deserializing key/value for 
> partition " + partition +
>                 " at offset " + record.offset() + ". If needed, please seek 
> past the record to continue consumption.", e);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to