[ https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224527#comment-15224527 ]
Jason Gustafson commented on KAFKA-3491: ---------------------------------------- This is a tough one. I tend to agree with [~ewencp] that the workaround is probably not obvious, but I'm reluctant to turn off the commit on close behavior since 99% of the time, it's what you want. I think the "at least once guarantee" requires some qualification even outside the scope of this problem: to get it, you need to handle all of the records returned in poll(). In other words, the guarantee only says that the records have been returned to the user at least once. The main problem with that is that we don't give the user a way to abort a batch. Initially I was considering adding either an abort() or rewind() API which could be used to reset positions to the last committed offsets, but then I thought of unsubscribe() which works if the consumer is going to be immediately closed anyway. However, maybe the rewind() option would be more intuitive for users in this case and could also be useful in other contexts (e.g. if a downstream system has become unresponsive, I might need to abort a batch, but keep the consumer running). Does that make sense? > Issue with consumer close() in finally block with 'enable.auto.commit=true' > --------------------------------------------------------------------------- > > Key: KAFKA-3491 > URL: https://issues.apache.org/jira/browse/KAFKA-3491 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0, 0.9.0.1 > Reporter: dan norwood > Assignee: Jason Gustafson > Priority: Minor > > imagine you have a run loop that looks like the following: > {code:java} > public void run() { > try { > consumer.subscribe(topics); > while (true) { > ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); > records.forEach(record -> process(record)); > } > } catch (WakeupException e) { > // ignore, we're closing > } catch (Exception e) { > log.error("Unexpected error", e); > } finally { > consumer.close(); > } > } > {code} > if you run this with 'enable.auto.commit=true' and throw an exception in the > 'process()' method you will still try to commit all the read, but > unprocessed, offsets in the most recent batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)