[ https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939605#comment-17939605 ]
Andrew Schofield commented on KAFKA-19051: ------------------------------------------ Yes, I've been thinking about this. I really don't like creating a new fake instance of ConsumerRecord and then using it to acknowledge. I think there are two paths forward. 1) Just change the KIP and remove the part about "the application can override this if it is using explicit acknowledgement". Or 2) Provide a config such as `share.acknowledge.type.on.deserialization.exception` which could then let the application override the default RELEASE with something such as REJECT. Personally, I prefer (1) and think that (2) just overcomplicates things. The whole point of automatically releasing the records is that the effect of a deserializable record is short-lived. What do [~brandboat] and [~frouleau] think? > Fix implicit acknowledgement cannot be overridden when > RecordDeserializationException occurs > -------------------------------------------------------------------------------------------- > > Key: KAFKA-19051 > URL: https://issues.apache.org/jira/browse/KAFKA-19051 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Frédérik ROULEAU > Assignee: Kuan Po Tseng > Priority: Major > > When a record generates a RecordDeserializationException, KIP mentioned that > with explicit acknowledgement the default Release can be overridden. > When tried, I have: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: The record cannot > be acknowledged. > at > org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123) > at > org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683) > at > org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534) > at org.example.frouleau.kip932.Main.main(Main.java:62) {code} > It looks like the record was already released. > Code used: > {code:java} > //.... > } catch (RecordDeserializationException re) { > long offset = re.offset(); > Throwable t = re.getCause(); > LOGGER.error("Failed to deserialize record at partition={} offset={}", > re.topicPartition().partition(), offset, t); > ConsumerRecord<String,String> record = new > ConsumerRecord<>(re.topicPartition().topic(), > re.topicPartition().partition(), offset, "", ""); > consumer.acknowledge(record, AcknowledgeType.REJECT); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)