[ https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Frédérik ROULEAU updated KAFKA-19051: ------------------------------------- Summary: Fix implicit acknowledgement cannot be overriden when RecordDeserializationException occurs (was: Fix implicit acknowledgement cannot be override when RecordDeserializationException occurs) > Fix implicit acknowledgement cannot be overriden when > RecordDeserializationException occurs > ------------------------------------------------------------------------------------------- > > Key: KAFKA-19051 > URL: https://issues.apache.org/jira/browse/KAFKA-19051 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Frédérik ROULEAU > Priority: Major > > When a record generates a RecordDeserializationException, KIP mentioned that > with explicit acknowledgement the default Release can be overridden. > When tried, I have: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: The record cannot > be acknowledged. > at > org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123) > at > org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683) > at > org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534) > at org.example.frouleau.kip932.Main.main(Main.java:62) {code} > It looks like the record was already released. > Code used: > {code:java} > //.... > } catch (RecordDeserializationException re) { > long offset = re.offset(); > Throwable t = re.getCause(); > LOGGER.error("Failed to deserialize record at partition={} offset={}", > re.topicPartition().partition(), offset, t); > ConsumerRecord<String,String> record = new > ConsumerRecord<>(re.topicPartition().topic(), > re.topicPartition().partition(), offset, "", ""); > consumer.acknowledge(record, AcknowledgeType.REJECT); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)