All: I have a kind of semantic question on how the kafka client library is behaving when:
1. You are providing you own Serialization/Deserialization class by setting up this prop (value.serializer) 2. While deserializing, I am throwing a serialization exception Based on my tests, not only does Kafka client swallow that exception (just log it), but it will try again to reprocess on the next poll basically behaving like a “poison pill”. Is this a bug or it is really intended, it really does not make sense to me. Below are the details: 1. the custom class deserializer throw an exception 2. this exception is caught by org.apache.kafka.clients.consumer.internals. Fetcher.parseRecord (line 625) and rethrown 3. the rethrown exception is then caught and logged by org.apache.kafka.clients.poll (line 275) 4. on the next poll, we get the same issue allover again. Caused by: java.lang.ArrayIndexOutOfBoundsException: 6 at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:257) at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:246) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:274) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:176) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) at com.morningstar.dp.messaging.common.serialization.avro.AvroNoSchemaGenericSerde.deserialize(AvroNoSchemaGenericSerde.java:72) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:622) at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:566) at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)