Thanks, great feedback everyone.

Jiangjie -- I was worried about interleaving as well. For commits using the
consumer's own current set of offsets, I agree we could easily limit to 1
outstanding request so the older one gets cancelled. For commits that
specify offsets manually, we might not need to do anything special, just
note in the docs that bad things can happen if you submit two separate
offset commit requests (i.e. don't wait for callbacks) and have retries
enabled. Alternatively, we could just serialize them, i.e. always have at
most 1 outstanding and retries always take priority.

Guozhang -- That breakdown of use cases looks right to me. I agree that I
don't think users of this API would be trying to use the futures for
promise pipelining or anything, the only use is to provide allow them to
block on the operation. That said, I think there are some tradeoffs to
consider between the two options:

Pros of Future/Cons of only having callback:
* Gives control over timeouts. With only callbacks users have to manage
this themselves if they care about timing. I think there's at least one
very common case for this: graceful shutdown where I want to try to commit
offsets, but after some time shutdown whether successful or not.
* Consistency. This matches the Producer interface and Futures are a
standard pattern.

Pros of only callback/cons of Future
* Know up front if its sync/async. This might simplify the implementation,
as Guozhang points out. (However, it also means the patch needs to add a
timeout mechanism, which doesn't exist yet. That's probably not a huge
patch, but non-trivial. Maybe it needs to be added regardless.)

Regardless of the interface we settle on, I'd argue that we should get rid
of the infinite retry version, at least limiting it to a max # of retries,
each of which are bound by a timeout. It's really problematic having it run
indefinitely long since it locks up the consumer, which means you can't,
e.g., shut down properly. More generally, I think anytime we we have an API
where a TimeoutException is *not* a possibility, we're almost definitely
trying to hide the network from the user in a way that makes it difficult
for them to write applications that behave correctly.

On the muting implementation, I'm not sure I'm convinced it's *required* to
mute the others entirely. Couldn't Selector.poll have a mechanism for
prioritizing reads rather than completely muting the other nodes? For
example, what if poll() did one select for just the key we're currently
keeping unmuted with the timeout, then if there are no keys ready to read,
reregister interest and select(0) to only get the ones that are immediately
ready.

Jay -- the polling thing isn't an issue if you just poll from within the
Future. That's how the WIP patch works. My only concern with that is that
it's unintuitive because most future implementations are just waiting for
another thread to finish some operation; the only potentially bad side
affect I could think of is that user callbacks might then run in the thread
calling Future.get(), which might be unexpected if they think they are
doing all the work of polling in a different thread.

-Ewen


On Wed, Apr 22, 2015 at 12:29 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com
> wrote:

> Hi Ewen,
>
> Only time I can think of where Application needs to know result of offset
> was committed or not during graceful shutdown and/or
> Runtime.addShutdownHook() so consumer application does not get duplicated
> records upon restart or does not have to deal with eliminating  already
> process offset.  Only thing that consumer application will have to handle
> is after XX retry failure to commit offset.  Or would prefer application to
> manage this last offset commit when offset can not be commit due to
> failure, connection timeout or any other failure case ?
>
> Thanks,
> Bhavesh
>
>
>
> On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > I second Guozhang's proposal. I do think we need the callback. The
> current
> > state is that for async commits you actually don't know if it succeeded.
> > However there is a totally valid case where you do need to know if it
> > succeeded but don't need to block, and without the callback you are
> stuck.
> > I think the futures will likely cause problems since blocking on the
> future
> > precludes polling which would allow it to complete.
> >
> > -Jay
> >
> > On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hi Ewen,
> > >
> > > I share the same concern you have about 2), that with the new API sync
> > > commit implementation is a bit awkward since we have a single-threaded
> > > design in new consumer. The reason that we need to mute other nodes for
> > > doing coordinator sync operations like join-group / offset commits /
> > offset
> > > fetches is to avoid long blocking due to possible "starvation" on
> network
> > > selector, so I think they need to be done still.
> > >
> > > On the other hand, I think users using the commit API will usually fall
> > > into three categories:
> > >
> > > 1) I really care that the offsets to be committed before moving on to
> > fetch
> > > more data, so I will wait FOREVER for it to complete.
> > >
> > > 2) I do not really care about whether it succeeds or not, so just fire
> > > "commit" and let's move on; if it fails it fails (and it will be
> logged).
> > >
> > > 3) I care if it succeeds or not, but I do not want to wait
> indefinitely;
> > so
> > > let me know if it does not finish within some timeout or failed (i.e.
> > give
> > > me the exceptions / error codes) and I will handle it.
> > >
> > > The current APIs does not handle case 3) above, which sits between
> BLOCK
> > > FOREVER and DO NOT CARE AT ALL, but most times people would not be very
> > > explicit about the exact "timeout", but just knowing it is definite and
> > > reasonably short is good enough. I think for this we probably do not
> need
> > > an extra timeout / retry settings, but rely on the general request
> retry
> > > settings; similarly we probably do not need "cancel".
> > >
> > > So I wonder if we can do a slightly different modification to API like
> > > this:
> > >
> > > void commit(Map<TopicPartition, Long> offsets, CommitType type,
> > > ConsumerCommitCallback callback);
> > >
> > > For case 1) people call "commit(offsets)" which will block forever
> until
> > it
> > > succeeds;
> > >
> > > For case 2) people call "commit(offsets, async)" which will return
> > > immediately, with not callback upon finishes;
> > >
> > > For case 3) people call "commit(offsets, async, callback)", and the
> > > callback will be executed when it finishes or #.request retries has
> > > exhausted.
> > >
> > > This API will make much smaller changes to the current implementations
> as
> > > well. Of course if we have a common scenario where users would really
> > care
> > > about the exact timeout for async commits, then Future may be a good
> > > approach.
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > > > Hey Ewen,
> > > >
> > > > This makes sense. People usually do not want to stop consuming when
> > > > committing offsets.
> > > >
> > > > One corner case about async commit with retries I am thinking is that
> > it
> > > > is possible that two offset commits interleave with each other and
> that
> > > > might create problem. Like you said maybe we can cancel the previous
> > one.
> > > >
> > > > Another thing is that whether the future mechanism will only be
> applied
> > > to
> > > > auto commit or it will also be used in manual commit? Because in new
> > > > consumer we allow user to provide an offset map for offset commit.
> > Simply
> > > > canceling a previous pending offset commit does not seem to be ideal
> in
> > > > this case because the two commits could be for different partitions.
> > > >
> > > > Thanks.
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> > wrote:
> > > >
> > > > >I'd like to get some feedback on changing the offset commit API in
> the
> > > new
> > > > >consumer. Since this is user-facing API I wanted to make sure this
> > gets
> > > > >better visibility than the JIRA (
> > > > >https://issues.apache.org/jira/browse/KAFKA-2123) might.
> > > > >
> > > > >The motivation is to make it possible to do async commits but be
> able
> > to
> > > > >tell when the commit completes/fails. I'm suggesting changing the
> API
> > > from
> > > > >
> > > > >void commit(Map offsets, CommitType)
> > > > >
> > > > >to
> > > > >
> > > > >Future<Void> commit(Map<TopicPartition, Long> offsets,
> > > > >ConsumerCommitCallback callback);
> > > > >
> > > > >which matches the approach used for the producer. The
> > > > >ConsumerCommitCallback only has one method:
> > > > >
> > > > >public void onCompletion(Exception exception);
> > > > >
> > > > >This enables a few different use cases:
> > > > >
> > > > >* Blocking commit via Future.get(), and blocking with timeouts via
> > > > >Future.get(long, TimeUnit)
> > > > >* See exceptions via the future (see discussion of retries below)
> > > > >* Callback-based notification so you can keep processing messages
> and
> > > only
> > > > >take action if something goes wrong, takes too long, etc. This is
> the
> > > use
> > > > >case that motivated
> > > > >* Fire and forget commits via a shorthand commit() API and ignoring
> > the
> > > > >resulting future.
> > > > >
> > > > >One big difference between this and the producer API is that there
> > isn't
> > > > >any result (except maybe an exception) from commitOffsets. This
> leads
> > to
> > > > >the somewhat awkward Future<Void> signature. I personally prefer
> that
> > to
> > > > >the sync/async flag, especially since it also provides a
> non-blocking
> > > > >interface for checking whether the commit is complete.
> > > > >
> > > > >I posted a WIP patch to the JIRA. In the progress of making it I
> > found a
> > > > >few issues that might be worth discussing:
> > > > >
> > > > >1. Retries. In the old approach, this was trivial since it only
> > applied
> > > to
> > > > >synchronous calls, so we could just loop until the request was
> > > successful.
> > > > >Do we want to start introducing a retries mechanism here, and should
> > it
> > > > >apply to all types of requests or are we going to end up with a
> couple
> > > of
> > > > >different retry settings for specific cases, like offset commit. The
> > WIP
> > > > >patch allows errors to bubble up through the Future on the first
> > > failure,
> > > > >which right now can cause some tests to fail transiently (e.g.
> > consumer
> > > > >bounce test).
> > > > >
> > > > >I think some sort of retry mechanism, even if it's an internal
> > constant
> > > > >rather than configurable, is probably the right solution, but I want
> > to
> > > > >figure out how broadly they should apply. I think adding them only
> for
> > > > >offset commits isn't hard.
> > > > >
> > > > >2. The Future implementation is a bit weird because the consumer
> > doesn't
> > > > >have a dedicated IO thread. My only concern is that this could lead
> to
> > > > >some
> > > > >unintuitive results based on the current implementation because the
> > way
> > > > >this works is to just run poll() in the thread calling Future.get(),
> > but
> > > > >it
> > > > >mutes all non-coordinator nodes which means other processing is
> > > > >effectively
> > > > >paused. If you're processing offset commits in a separate thread
> from
> > > your
> > > > >main consumer thread that's calling poll(), you might just end up
> > > bocking
> > > > >the main thread while waiting on the Future. Then again, I'm not
> sure
> > > the
> > > > >other nodes really even need to be muted -- maybe Jay or Guozhang
> have
> > > > >ideas on this?
> > > > >
> > > > >3. Should the future be cancellable? This probably isn't hard to
> > > > >implement,
> > > > >but I'm not sure we should even bother. On the one hand it could be
> > > nice,
> > > > >especially if you have an old commit request that you want to
> > superseded
> > > > >by
> > > > >a new one with updated offsets. On the other hand, if the request
> has
> > > > >already been sent out, cancelling it won't accomplish anything. I
> > think
> > > > >the
> > > > >only case this is useful is when there are retries.
> > > > >
> > > > >Thoughts?
> > > > >
> > > > >--
> > > > >Thanks,
> > > > >Ewen
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
Thanks,
Ewen

Reply via email to