[
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989453#comment-15989453
]
Dong Lin commented on KAFKA-4740:
---------------------------------
I realized that this is a known problem since Feb after Jason showed me this
ticket. I have submitted the patch https://github.com/apache/kafka/pull/2864 to
make sure that we return all the data up to the record that failed
deserialization and throw the exception in the next poll.
> Using new consumer API with a Deserializer that throws SerializationException
> can lead to infinite loop
> -------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Reporter: Sébastien Launay
> Assignee: Sébastien Launay
> Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such
> the exception is swallowed by the {{NetworkClient}} class and result in an
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer<String, Integer> kafkaConsumer = new
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords<String, Integer> records =
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord<String, Integer> record : records) {
> logger.info("Message with partition: {}, offset: {}, key:
> {}, value: {}",
> record.partition(), record.offset(), record.key(),
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
> }
> }
> {code}
> when run with the following records (third record has an invalid Integer
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO consumer.Consumer - Got 2 messages
> INFO consumer.Consumer - Message with partition: 0, offset: 0, key: null,
> value: 0
> INFO consumer.Consumer - Message with partition: 0, offset: 1, key: null,
> value: 1
> WARN consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of
> data received by IntegerDeserializer is not 4
> WARN consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of
> data received by IntegerDeserializer is not 4
> WARN consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of
> data received by IntegerDeserializer is not 4
> ...
> {noformat}
> I don't believe committing offsets would help and even if it did this could
> potentially result in a few well formed records not being consumed from that
> {{ConsumerRecords}} batch (data loss).
> I have only seen a few mentions of this bug online \[3\] but I believe this
> is a critical issue as the new consumer API is not in beta anymore yet if you
> do not control producers (that can inject malformed values) or you use some
> advanced deserializer that throws such exception (e.g. schema-registry
> {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from
> advancing in the stream.
> Current workarounds:
> - use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g.
> {{ByteArrayDeserializer}}, {{StringDeserializer}})
> - wrap the {{Deserializer}} to catch and log the {{SerializationException}}
> but return {{null}} and then check for {{null}} in the client code (that's
> what we use on top of {{KafkaAvroDeserializer}} in case there is an issue
> reaching the schema registry or the Avro datum is either invalid or not
> compatible with the reader's schema for some reason)
> Potential solutions:
> # continue to throw {{SerializationException}} when calling
> {{Consumer#poll(long)}} but skip that malformed record on next
> {{Consumer#poll(long)}}
> # do not throw {{SerializationException}} when calling
> {{Consumer#poll(long)}} but expose information about invalid records in
> {{ConsumerRecords}}
> # do not throw {{SerializationException}} when calling
> {{Consumer#poll(long)}} but store the exception(s) in the {{ConsumerRecord}}
> object record so that it is rethrown on {{ConsumerRecord#key()}} and
> {{ConsumerRecord#value()}}
> # do not deserialize records during {{Consumer#poll()}} but do it when
> calling {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to
> the old consumer)
> I believe any of those solutions breaks compatibility semantic wise but not
> necessary binary compatibility as the {{SerializationException}} is a
> {{RuntimeException}} so it could be "moved around".
> My preference goes to the two last ones and I would be happy to contribute
> such a change as well as update the documentation on
> {{SerializationException}} to reflect that it is not only used for
> serializing records.
> \[1\]
> https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
> \[1\]
> http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
> \[2\]
> https://github.com/slaunay/kafka-consumer-serialization-exception-example
> \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)