[ 
https://issues.apache.org/jira/browse/KAFKA-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223604#comment-15223604
 ] 

Ewen Cheslack-Postava commented on KAFKA-3491:
----------------------------------------------

I don't know, this strikes me as a solution that only an implementer would have 
come up with, which is worrying since it's not necessarily clear to users why 
they would do it. In particular, the fact that this didn't come up immediately 
as a solution when we were discussing this offline indicates to me it is 
sufficiently non-obvious that a lot of folks will get it wrong.

This might still end up being the best solution, but I think it's worth 
exploring alternatives that might be clearer to the user. When close() is 
invoked, we really have no useful info about whether processing was complete, 
which is the source of this problem. An alternative would be to expand the API 
with a variant close(boolean exceptional), so the implementation looks like 
this instead:

{code}
 public void run() {
    boolean failed = false;
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
      failed = true;
    } finally {
      consumer.close(failed);
    }
  }
{code}

I think the drawback here is that originally I thought we could end up with 2 
simple cases of consumer.close(true) and consumer.close(false), but since you 
need to also handle graceful exit via, e.g., a shutdown flag, you actually have 
to track how it failed and then call it once. Unless we properly dedupe close() 
calls, which it looks like we do, and then maybe we could do:

{code}
 public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
      consumer.close(true);
    } finally {
      consumer.close();
    }
  }
{code}

The error handling is kinda verbose, not sure if its clearer to just 
consumer.close() in the WakeupException handler as well, but the one 
significant case we care about here (where offsets won't be committed) can be 
made very clear by the consumer.close(true) call, which a user can easily 
lookup the expected semantics of (whereas the unsubscribe has implicit effects 
that we're relying on).

> Issue with consumer close() in finally block with 'enable.auto.commit=true'
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3491
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3491
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: dan norwood
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> imagine you have a run loop that looks like the following:
> {code:java}
>   public void run() {
>     try {
>       consumer.subscribe(topics);
>       while (true) {
>         ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
>         records.forEach(record -> process(record));
>       }
>     } catch (WakeupException e) {
>       // ignore, we're closing
>     } catch (Exception e) {
>       log.error("Unexpected error", e);
>     } finally {
>       consumer.close();
>     }
>   }
> {code}
> if you run this with 'enable.auto.commit=true' and throw an exception in the 
> 'process()' method you will still try to commit all the read, but 
> unprocessed, offsets in the most recent batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to