[ 
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939605#comment-17939605
 ] 

Andrew Schofield commented on KAFKA-19051:
------------------------------------------

Yes, I've been thinking about this. I really don't like creating a new fake 
instance of ConsumerRecord and then using it to acknowledge. I think there are 
two paths forward.

1) Just change the KIP and remove the part about "the application can override 
this if it is using explicit acknowledgement". Or

2) Provide a config such as 
`share.acknowledge.type.on.deserialization.exception` which could then let the 
application override the default RELEASE with something such as REJECT.

Personally, I prefer (1) and think that (2) just overcomplicates things. The 
whole point of automatically releasing the records is that the effect of a 
deserializable record is short-lived.

What do [~brandboat] and [~frouleau] think?

> Fix implicit acknowledgement cannot be overridden when 
> RecordDeserializationException occurs
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19051
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19051
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Frédérik ROULEAU
>            Assignee: Kuan Po Tseng
>            Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that 
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot 
> be acknowledged.
>     at 
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
>     at 
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
>     at 
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
>     at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //....
> } catch (RecordDeserializationException re) {
>     long offset = re.offset();
>     Throwable t = re.getCause();
>     LOGGER.error("Failed to deserialize record at partition={} offset={}", 
> re.topicPartition().partition(), offset, t);
>     ConsumerRecord<String,String> record = new 
> ConsumerRecord<>(re.topicPartition().topic(), 
> re.topicPartition().partition(), offset, "", "");
>     consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to