I'd like to get some feedback on changing the offset commit API in the new
consumer. Since this is user-facing API I wanted to make sure this gets
better visibility than the JIRA (
https://issues.apache.org/jira/browse/KAFKA-2123) might.

The motivation is to make it possible to do async commits but be able to
tell when the commit completes/fails. I'm suggesting changing the API from

void commit(Map offsets, CommitType)

to

Future<Void> commit(Map<TopicPartition, Long> offsets,
ConsumerCommitCallback callback);

which matches the approach used for the producer. The
ConsumerCommitCallback only has one method:

public void onCompletion(Exception exception);

This enables a few different use cases:

* Blocking commit via Future.get(), and blocking with timeouts via
Future.get(long, TimeUnit)
* See exceptions via the future (see discussion of retries below)
* Callback-based notification so you can keep processing messages and only
take action if something goes wrong, takes too long, etc. This is the use
case that motivated
* Fire and forget commits via a shorthand commit() API and ignoring the
resulting future.

One big difference between this and the producer API is that there isn't
any result (except maybe an exception) from commitOffsets. This leads to
the somewhat awkward Future<Void> signature. I personally prefer that to
the sync/async flag, especially since it also provides a non-blocking
interface for checking whether the commit is complete.

I posted a WIP patch to the JIRA. In the progress of making it I found a
few issues that might be worth discussing:

1. Retries. In the old approach, this was trivial since it only applied to
synchronous calls, so we could just loop until the request was successful.
Do we want to start introducing a retries mechanism here, and should it
apply to all types of requests or are we going to end up with a couple of
different retry settings for specific cases, like offset commit. The WIP
patch allows errors to bubble up through the Future on the first failure,
which right now can cause some tests to fail transiently (e.g. consumer
bounce test).

I think some sort of retry mechanism, even if it's an internal constant
rather than configurable, is probably the right solution, but I want to
figure out how broadly they should apply. I think adding them only for
offset commits isn't hard.

2. The Future implementation is a bit weird because the consumer doesn't
have a dedicated IO thread. My only concern is that this could lead to some
unintuitive results based on the current implementation because the way
this works is to just run poll() in the thread calling Future.get(), but it
mutes all non-coordinator nodes which means other processing is effectively
paused. If you're processing offset commits in a separate thread from your
main consumer thread that's calling poll(), you might just end up bocking
the main thread while waiting on the Future. Then again, I'm not sure the
other nodes really even need to be muted -- maybe Jay or Guozhang have
ideas on this?

3. Should the future be cancellable? This probably isn't hard to implement,
but I'm not sure we should even bother. On the one hand it could be nice,
especially if you have an old commit request that you want to superseded by
a new one with updated offsets. On the other hand, if the request has
already been sent out, cancelling it won't accomplish anything. I think the
only case this is useful is when there are retries.

Thoughts?

-- 
Thanks,
Ewen

Reply via email to