[ https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223604#comment-15223604 ]
Ewen Cheslack-Postava commented on KAFKA-3491: ---------------------------------------------- I don't know, this strikes me as a solution that only an implementer would have come up with, which is worrying since it's not necessarily clear to users why they would do it. In particular, the fact that this didn't come up immediately as a solution when we were discussing this offline indicates to me it is sufficiently non-obvious that a lot of folks will get it wrong. This might still end up being the best solution, but I think it's worth exploring alternatives that might be clearer to the user. When close() is invoked, we really have no useful info about whether processing was complete, which is the source of this problem. An alternative would be to expand the API with a variant close(boolean exceptional), so the implementation looks like this instead: {code} public void run() { boolean failed = false; try { consumer.subscribe(topics); while (true) { ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); failed = true; } finally { consumer.close(failed); } } {code} I think the drawback here is that originally I thought we could end up with 2 simple cases of consumer.close(true) and consumer.close(false), but since you need to also handle graceful exit via, e.g., a shutdown flag, you actually have to track how it failed and then call it once. Unless we properly dedupe close() calls, which it looks like we do, and then maybe we could do: {code} public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); consumer.close(true); } finally { consumer.close(); } } {code} The error handling is kinda verbose, not sure if its clearer to just consumer.close() in the WakeupException handler as well, but the one significant case we care about here (where offsets won't be committed) can be made very clear by the consumer.close(true) call, which a user can easily lookup the expected semantics of (whereas the unsubscribe has implicit effects that we're relying on). > Issue with consumer close() in finally block with 'enable.auto.commit=true' > --------------------------------------------------------------------------- > > Key: KAFKA-3491 > URL: https://issues.apache.org/jira/browse/KAFKA-3491 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0, 0.9.0.1 > Reporter: dan norwood > Assignee: Jason Gustafson > Priority: Minor > > imagine you have a run loop that looks like the following: > {code:java} > public void run() { > try { > consumer.subscribe(topics); > while (true) { > ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); > records.forEach(record -> process(record)); > } > } catch (WakeupException e) { > // ignore, we're closing > } catch (Exception e) { > log.error("Unexpected error", e); > } finally { > consumer.close(); > } > } > {code} > if you run this with 'enable.auto.commit=true' and throw an exception in the > 'process()' method you will still try to commit all the read, but > unprocessed, offsets in the most recent batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)