[ 
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frédérik ROULEAU updated KAFKA-19051:
-------------------------------------
    Summary: Fix implicit acknowledgement cannot be overriden when 
RecordDeserializationException occurs  (was: Fix implicit acknowledgement 
cannot be override when RecordDeserializationException occurs)

> Fix implicit acknowledgement cannot be overriden when 
> RecordDeserializationException occurs
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19051
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19051
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Frédérik ROULEAU
>            Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that 
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot 
> be acknowledged.
>     at 
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
>     at 
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
>     at 
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
>     at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //....
> } catch (RecordDeserializationException re) {
>     long offset = re.offset();
>     Throwable t = re.getCause();
>     LOGGER.error("Failed to deserialize record at partition={} offset={}", 
> re.topicPartition().partition(), offset, t);
>     ConsumerRecord<String,String> record = new 
> ConsumerRecord<>(re.topicPartition().topic(), 
> re.topicPartition().partition(), offset, "", "");
>     consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to