I second Guozhang's proposal. I do think we need the callback. The current
state is that for async commits you actually don't know if it succeeded.
However there is a totally valid case where you do need to know if it
succeeded but don't need to block, and without the callback you are stuck.
I think the futures will likely cause problems since blocking on the future
precludes polling which would allow it to complete.

-Jay

On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Ewen,
>
> I share the same concern you have about 2), that with the new API sync
> commit implementation is a bit awkward since we have a single-threaded
> design in new consumer. The reason that we need to mute other nodes for
> doing coordinator sync operations like join-group / offset commits / offset
> fetches is to avoid long blocking due to possible "starvation" on network
> selector, so I think they need to be done still.
>
> On the other hand, I think users using the commit API will usually fall
> into three categories:
>
> 1) I really care that the offsets to be committed before moving on to fetch
> more data, so I will wait FOREVER for it to complete.
>
> 2) I do not really care about whether it succeeds or not, so just fire
> "commit" and let's move on; if it fails it fails (and it will be logged).
>
> 3) I care if it succeeds or not, but I do not want to wait indefinitely; so
> let me know if it does not finish within some timeout or failed (i.e. give
> me the exceptions / error codes) and I will handle it.
>
> The current APIs does not handle case 3) above, which sits between BLOCK
> FOREVER and DO NOT CARE AT ALL, but most times people would not be very
> explicit about the exact "timeout", but just knowing it is definite and
> reasonably short is good enough. I think for this we probably do not need
> an extra timeout / retry settings, but rely on the general request retry
> settings; similarly we probably do not need "cancel".
>
> So I wonder if we can do a slightly different modification to API like
> this:
>
> void commit(Map<TopicPartition, Long> offsets, CommitType type,
> ConsumerCommitCallback callback);
>
> For case 1) people call "commit(offsets)" which will block forever until it
> succeeds;
>
> For case 2) people call "commit(offsets, async)" which will return
> immediately, with not callback upon finishes;
>
> For case 3) people call "commit(offsets, async, callback)", and the
> callback will be executed when it finishes or #.request retries has
> exhausted.
>
> This API will make much smaller changes to the current implementations as
> well. Of course if we have a common scenario where users would really care
> about the exact timeout for async commits, then Future may be a good
> approach.
>
> Guozhang
>
>
> On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Hey Ewen,
> >
> > This makes sense. People usually do not want to stop consuming when
> > committing offsets.
> >
> > One corner case about async commit with retries I am thinking is that it
> > is possible that two offset commits interleave with each other and that
> > might create problem. Like you said maybe we can cancel the previous one.
> >
> > Another thing is that whether the future mechanism will only be applied
> to
> > auto commit or it will also be used in manual commit? Because in new
> > consumer we allow user to provide an offset map for offset commit. Simply
> > canceling a previous pending offset commit does not seem to be ideal in
> > this case because the two commits could be for different partitions.
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote:
> >
> > >I'd like to get some feedback on changing the offset commit API in the
> new
> > >consumer. Since this is user-facing API I wanted to make sure this gets
> > >better visibility than the JIRA (
> > >https://issues.apache.org/jira/browse/KAFKA-2123) might.
> > >
> > >The motivation is to make it possible to do async commits but be able to
> > >tell when the commit completes/fails. I'm suggesting changing the API
> from
> > >
> > >void commit(Map offsets, CommitType)
> > >
> > >to
> > >
> > >Future<Void> commit(Map<TopicPartition, Long> offsets,
> > >ConsumerCommitCallback callback);
> > >
> > >which matches the approach used for the producer. The
> > >ConsumerCommitCallback only has one method:
> > >
> > >public void onCompletion(Exception exception);
> > >
> > >This enables a few different use cases:
> > >
> > >* Blocking commit via Future.get(), and blocking with timeouts via
> > >Future.get(long, TimeUnit)
> > >* See exceptions via the future (see discussion of retries below)
> > >* Callback-based notification so you can keep processing messages and
> only
> > >take action if something goes wrong, takes too long, etc. This is the
> use
> > >case that motivated
> > >* Fire and forget commits via a shorthand commit() API and ignoring the
> > >resulting future.
> > >
> > >One big difference between this and the producer API is that there isn't
> > >any result (except maybe an exception) from commitOffsets. This leads to
> > >the somewhat awkward Future<Void> signature. I personally prefer that to
> > >the sync/async flag, especially since it also provides a non-blocking
> > >interface for checking whether the commit is complete.
> > >
> > >I posted a WIP patch to the JIRA. In the progress of making it I found a
> > >few issues that might be worth discussing:
> > >
> > >1. Retries. In the old approach, this was trivial since it only applied
> to
> > >synchronous calls, so we could just loop until the request was
> successful.
> > >Do we want to start introducing a retries mechanism here, and should it
> > >apply to all types of requests or are we going to end up with a couple
> of
> > >different retry settings for specific cases, like offset commit. The WIP
> > >patch allows errors to bubble up through the Future on the first
> failure,
> > >which right now can cause some tests to fail transiently (e.g. consumer
> > >bounce test).
> > >
> > >I think some sort of retry mechanism, even if it's an internal constant
> > >rather than configurable, is probably the right solution, but I want to
> > >figure out how broadly they should apply. I think adding them only for
> > >offset commits isn't hard.
> > >
> > >2. The Future implementation is a bit weird because the consumer doesn't
> > >have a dedicated IO thread. My only concern is that this could lead to
> > >some
> > >unintuitive results based on the current implementation because the way
> > >this works is to just run poll() in the thread calling Future.get(), but
> > >it
> > >mutes all non-coordinator nodes which means other processing is
> > >effectively
> > >paused. If you're processing offset commits in a separate thread from
> your
> > >main consumer thread that's calling poll(), you might just end up
> bocking
> > >the main thread while waiting on the Future. Then again, I'm not sure
> the
> > >other nodes really even need to be muted -- maybe Jay or Guozhang have
> > >ideas on this?
> > >
> > >3. Should the future be cancellable? This probably isn't hard to
> > >implement,
> > >but I'm not sure we should even bother. On the one hand it could be
> nice,
> > >especially if you have an old commit request that you want to superseded
> > >by
> > >a new one with updated offsets. On the other hand, if the request has
> > >already been sent out, cancelling it won't accomplish anything. I think
> > >the
> > >only case this is useful is when there are retries.
> > >
> > >Thoughts?
> > >
> > >--
> > >Thanks,
> > >Ewen
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to