[ 
https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224527#comment-15224527
 ] 

Jason Gustafson commented on KAFKA-3491:
----------------------------------------

This is a tough one. I tend to agree with [~ewencp] that the workaround is 
probably not obvious, but I'm reluctant to turn off the commit on close 
behavior since 99% of the time, it's what you want. I think the "at least once 
guarantee" requires some qualification even outside the scope of this problem: 
to get it, you need to handle all of the records returned in poll(). In other 
words, the guarantee only says that the records have been returned to the user 
at least once. The main problem with that is that we don't give the user a way 
to abort a batch. Initially I was considering adding either an abort() or 
rewind() API which could be used to reset positions to the last committed 
offsets, but then I thought of unsubscribe() which works if the consumer is 
going to be immediately closed anyway. However, maybe the rewind() option would 
be more intuitive for users in this case and could also be useful in other 
contexts (e.g. if a downstream system has become unresponsive, I might need to 
abort a batch, but keep the consumer running). Does that make sense?

> Issue with consumer close() in finally block with 'enable.auto.commit=true'
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3491
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3491
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: dan norwood
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> imagine you have a run loop that looks like the following:
> {code:java}
>   public void run() {
>     try {
>       consumer.subscribe(topics);
>       while (true) {
>         ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
>         records.forEach(record -> process(record));
>       }
>     } catch (WakeupException e) {
>       // ignore, we're closing
>     } catch (Exception e) {
>       log.error("Unexpected error", e);
>     } finally {
>       consumer.close();
>     }
>   }
> {code}
> if you run this with 'enable.auto.commit=true' and throw an exception in the 
> 'process()' method you will still try to commit all the read, but 
> unprocessed, offsets in the most recent batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to