Chia-Ping Tsai created KAFKA-7036: ------------------------------------- Summary: Complete the docs of KafkaConsumer#poll Key: KAFKA-7036 URL: https://issues.apache.org/jira/browse/KAFKA-7036 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai
KafkaConsumer#poll has a nice docs about the expected exceptions. However, it lacks the description of SerializationException. Another mirror issue is that KafkaConsumer doesn't catch all type of exception which may be thrown by deserializer (see below). We should use Throwable to replace the RuntimeException so as to catch all exception and then wrap them to SerializationException. {code:java} private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) { try { long offset = record.offset(); long timestamp = record.timestamp(); TimestampType timestampType = batch.timestampType(); Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers); } catch (RuntimeException e) { throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); } }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)