[ https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15224792#comment-15224792 ]
Guozhang Wang commented on KAFKA-3491: -------------------------------------- I think when the event loop is single threaded, it is safe to only rewind to the last poll() without adding any additional state to track since we know that poll completes without failures. This could potentially save lots of duplicates compared with rewinding to the previous committed offset, depending on the auto commit interval. The tricky thing happens when the event loop is not single threaded, and not "synchronized", i.e. something like: {code} while (true) { ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> executorPoll.process(record)); // give it to a thread poll, and not wait until all threads have completed processing before moving on to the next loop } {code} But for this case, users need to handle a more general issue for multi-threading, to manage which record's offsets have really been processed successfully. Right? > Issue with consumer close() in finally block with 'enable.auto.commit=true' > --------------------------------------------------------------------------- > > Key: KAFKA-3491 > URL: https://issues.apache.org/jira/browse/KAFKA-3491 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0, 0.9.0.1 > Reporter: dan norwood > Assignee: Jason Gustafson > Priority: Minor > > imagine you have a run loop that looks like the following: > {code:java} > public void run() { > try { > consumer.subscribe(topics); > while (true) { > ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); > records.forEach(record -> process(record)); > } > } catch (WakeupException e) { > // ignore, we're closing > } catch (Exception e) { > log.error("Unexpected error", e); > } finally { > consumer.close(); > } > } > {code} > if you run this with 'enable.auto.commit=true' and throw an exception in the > 'process()' method you will still try to commit all the read, but > unprocessed, offsets in the most recent batch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)