Hi Ewen,

I share the same concern you have about 2), that with the new API sync
commit implementation is a bit awkward since we have a single-threaded
design in new consumer. The reason that we need to mute other nodes for
doing coordinator sync operations like join-group / offset commits / offset
fetches is to avoid long blocking due to possible "starvation" on network
selector, so I think they need to be done still.

On the other hand, I think users using the commit API will usually fall
into three categories:

1) I really care that the offsets to be committed before moving on to fetch
more data, so I will wait FOREVER for it to complete.

2) I do not really care about whether it succeeds or not, so just fire
"commit" and let's move on; if it fails it fails (and it will be logged).

3) I care if it succeeds or not, but I do not want to wait indefinitely; so
let me know if it does not finish within some timeout or failed (i.e. give
me the exceptions / error codes) and I will handle it.

The current APIs does not handle case 3) above, which sits between BLOCK
FOREVER and DO NOT CARE AT ALL, but most times people would not be very
explicit about the exact "timeout", but just knowing it is definite and
reasonably short is good enough. I think for this we probably do not need
an extra timeout / retry settings, but rely on the general request retry
settings; similarly we probably do not need "cancel".

So I wonder if we can do a slightly different modification to API like this:

void commit(Map<TopicPartition, Long> offsets, CommitType type,
ConsumerCommitCallback callback);

For case 1) people call "commit(offsets)" which will block forever until it
succeeds;

For case 2) people call "commit(offsets, async)" which will return
immediately, with not callback upon finishes;

For case 3) people call "commit(offsets, async, callback)", and the
callback will be executed when it finishes or #.request retries has
exhausted.

This API will make much smaller changes to the current implementations as
well. Of course if we have a common scenario where users would really care
about the exact timeout for async commits, then Future may be a good
approach.

Guozhang


On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hey Ewen,
>
> This makes sense. People usually do not want to stop consuming when
> committing offsets.
>
> One corner case about async commit with retries I am thinking is that it
> is possible that two offset commits interleave with each other and that
> might create problem. Like you said maybe we can cancel the previous one.
>
> Another thing is that whether the future mechanism will only be applied to
> auto commit or it will also be used in manual commit? Because in new
> consumer we allow user to provide an offset map for offset commit. Simply
> canceling a previous pending offset commit does not seem to be ideal in
> this case because the two commits could be for different partitions.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:
>
> >I'd like to get some feedback on changing the offset commit API in the new
> >consumer. Since this is user-facing API I wanted to make sure this gets
> >better visibility than the JIRA (
> >https://issues.apache.org/jira/browse/KAFKA-2123) might.
> >
> >The motivation is to make it possible to do async commits but be able to
> >tell when the commit completes/fails. I'm suggesting changing the API from
> >
> >void commit(Map offsets, CommitType)
> >
> >to
> >
> >Future<Void> commit(Map<TopicPartition, Long> offsets,
> >ConsumerCommitCallback callback);
> >
> >which matches the approach used for the producer. The
> >ConsumerCommitCallback only has one method:
> >
> >public void onCompletion(Exception exception);
> >
> >This enables a few different use cases:
> >
> >* Blocking commit via Future.get(), and blocking with timeouts via
> >Future.get(long, TimeUnit)
> >* See exceptions via the future (see discussion of retries below)
> >* Callback-based notification so you can keep processing messages and only
> >take action if something goes wrong, takes too long, etc. This is the use
> >case that motivated
> >* Fire and forget commits via a shorthand commit() API and ignoring the
> >resulting future.
> >
> >One big difference between this and the producer API is that there isn't
> >any result (except maybe an exception) from commitOffsets. This leads to
> >the somewhat awkward Future<Void> signature. I personally prefer that to
> >the sync/async flag, especially since it also provides a non-blocking
> >interface for checking whether the commit is complete.
> >
> >I posted a WIP patch to the JIRA. In the progress of making it I found a
> >few issues that might be worth discussing:
> >
> >1. Retries. In the old approach, this was trivial since it only applied to
> >synchronous calls, so we could just loop until the request was successful.
> >Do we want to start introducing a retries mechanism here, and should it
> >apply to all types of requests or are we going to end up with a couple of
> >different retry settings for specific cases, like offset commit. The WIP
> >patch allows errors to bubble up through the Future on the first failure,
> >which right now can cause some tests to fail transiently (e.g. consumer
> >bounce test).
> >
> >I think some sort of retry mechanism, even if it's an internal constant
> >rather than configurable, is probably the right solution, but I want to
> >figure out how broadly they should apply. I think adding them only for
> >offset commits isn't hard.
> >
> >2. The Future implementation is a bit weird because the consumer doesn't
> >have a dedicated IO thread. My only concern is that this could lead to
> >some
> >unintuitive results based on the current implementation because the way
> >this works is to just run poll() in the thread calling Future.get(), but
> >it
> >mutes all non-coordinator nodes which means other processing is
> >effectively
> >paused. If you're processing offset commits in a separate thread from your
> >main consumer thread that's calling poll(), you might just end up bocking
> >the main thread while waiting on the Future. Then again, I'm not sure the
> >other nodes really even need to be muted -- maybe Jay or Guozhang have
> >ideas on this?
> >
> >3. Should the future be cancellable? This probably isn't hard to
> >implement,
> >but I'm not sure we should even bother. On the one hand it could be nice,
> >especially if you have an old commit request that you want to superseded
> >by
> >a new one with updated offsets. On the other hand, if the request has
> >already been sent out, cancelling it won't accomplish anything. I think
> >the
> >only case this is useful is when there are retries.
> >
> >Thoughts?
> >
> >--
> >Thanks,
> >Ewen
>
>


-- 
-- Guozhang

Reply via email to