[ 
https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223682#comment-15223682
 ] 
Ewen Cheslack-Postava commented on KAFKA-3491:
----------------------------------------------

[~guozhang] Comparing to other clients assumes auto offset commit is a common 
pattern. I'm not really sure it is. Maybe [~edenhill] can comment on that. I'm 
still on the fence if it's actually a good feature to provide (very largely 
because of complications like this -- unless you can *absolutely guarantee* 
correct at-least once w/ "optimistic" (non-failure mode) exactly-once behavior).

My argument for adding "close(boolean failure)" is that it's explicitly clear 
to the user what it means and when they should use it. "consumer.unsubscribe()" 
is less clear because it already exists and has other meanings -- unsubscribing 
is something you can do which has a side effect that happens to make the 
failure handling case work out, but you have to know that it is the "trick" to 
make auto offset commit work. But you could also make it explicit by making it 
obvious in the API when you should close(exception=true) and when you can 
close(exception=false) (or just close()).

> Issue with consumer close() in finally block with 'enable.auto.commit=true'
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3491
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3491
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: dan norwood
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> imagine you have a run loop that looks like the following:
> {code:java}
>   public void run() {
>     try {
>       consumer.subscribe(topics);
>       while (true) {
>         ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
>         records.forEach(record -> process(record));
>       }
>     } catch (WakeupException e) {
>       // ignore, we're closing
>     } catch (Exception e) {
>       log.error("Unexpected error", e);
>     } finally {
>       consumer.close();
>     }
>   }
> {code}
> if you run this with 'enable.auto.commit=true' and throw an exception in the 
> 'process()' method you will still try to commit all the read, but 
> unprocessed, offsets in the most recent batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to