> same time I would prefer additional churn to the API.

I meant to say prefer _less_ churn to the API.

On Tue, Jun 09, 2015 at 08:35:35AM -0700, Joel Koshy wrote:
> Ewen,
> 
> Sorry for the late comment, but while we are discussing this API I
> think we should also consider the addition of metadata to the offset
> commit. This is supported on the broker-side but there is no means in
> the current API to include metadata or retention time in the offset
> commit.  Ideally, this would also mean changing the "committed" API to
> return the offset + metadata. I realize this is orthogonal to what you
> are trying to achieve, so I'm okay with breaking this out - at the
> same time I would prefer additional churn to the API.
> 
> Thanks,
> 
> Joel
> 
> On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote:
> > Thanks Ewen,
> > 
> > 1. I agree with you as for the pros of Future; my argument for pure
> > Callback, as I mentioned, is that it sounds to me unlikely users would
> > usually want to explicitly set timeout. That said, if people think this
> > scenario is actually common like in graceful shutdown, then I am OK with
> > Future against just the callback. My concern against Future is that after
> > thinking through the code myself I feel the implementation would be quite
> > complicated.
> > 
> > 2. As for infinite retries, for now we have three "coordinator-blocking"
> > call, one for join-group, one for sync-commit-offsets, one for
> > fetch-offsets; I think for these three it is appropriate to use blocking
> > calls, since the consumer cannot proceed anyways without getting the
> > response from these calls according to their semantics.
> > 
> > 3. With the priority mechanism I think we can then get rid of the muting
> > mechanism. Not sure if it is possible to implement in
> > java.nio.channels.Selector?
> > 
> > Guozhang
> > 
> > 
> > 
> > On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava <e...@confluent.io>
> > wrote:
> > 
> > > Thanks, great feedback everyone.
> > >
> > > Jiangjie -- I was worried about interleaving as well. For commits using 
> > > the
> > > consumer's own current set of offsets, I agree we could easily limit to 1
> > > outstanding request so the older one gets cancelled. For commits that
> > > specify offsets manually, we might not need to do anything special, just
> > > note in the docs that bad things can happen if you submit two separate
> > > offset commit requests (i.e. don't wait for callbacks) and have retries
> > > enabled. Alternatively, we could just serialize them, i.e. always have at
> > > most 1 outstanding and retries always take priority.
> > >
> > > Guozhang -- That breakdown of use cases looks right to me. I agree that I
> > > don't think users of this API would be trying to use the futures for
> > > promise pipelining or anything, the only use is to provide allow them to
> > > block on the operation. That said, I think there are some tradeoffs to
> > > consider between the two options:
> > >
> > > Pros of Future/Cons of only having callback:
> > > * Gives control over timeouts. With only callbacks users have to manage
> > > this themselves if they care about timing. I think there's at least one
> > > very common case for this: graceful shutdown where I want to try to commit
> > > offsets, but after some time shutdown whether successful or not.
> > > * Consistency. This matches the Producer interface and Futures are a
> > > standard pattern.
> > >
> > > Pros of only callback/cons of Future
> > > * Know up front if its sync/async. This might simplify the implementation,
> > > as Guozhang points out. (However, it also means the patch needs to add a
> > > timeout mechanism, which doesn't exist yet. That's probably not a huge
> > > patch, but non-trivial. Maybe it needs to be added regardless.)
> > >
> > > Regardless of the interface we settle on, I'd argue that we should get rid
> > > of the infinite retry version, at least limiting it to a max # of retries,
> > > each of which are bound by a timeout. It's really problematic having it 
> > > run
> > > indefinitely long since it locks up the consumer, which means you can't,
> > > e.g., shut down properly. More generally, I think anytime we we have an 
> > > API
> > > where a TimeoutException is *not* a possibility, we're almost definitely
> > > trying to hide the network from the user in a way that makes it difficult
> > > for them to write applications that behave correctly.
> > >
> > > On the muting implementation, I'm not sure I'm convinced it's *required* 
> > > to
> > > mute the others entirely. Couldn't Selector.poll have a mechanism for
> > > prioritizing reads rather than completely muting the other nodes? For
> > > example, what if poll() did one select for just the key we're currently
> > > keeping unmuted with the timeout, then if there are no keys ready to read,
> > > reregister interest and select(0) to only get the ones that are 
> > > immediately
> > > ready.
> > >
> > > Jay -- the polling thing isn't an issue if you just poll from within the
> > > Future. That's how the WIP patch works. My only concern with that is that
> > > it's unintuitive because most future implementations are just waiting for
> > > another thread to finish some operation; the only potentially bad side
> > > affect I could think of is that user callbacks might then run in the 
> > > thread
> > > calling Future.get(), which might be unexpected if they think they are
> > > doing all the work of polling in a different thread.
> > >
> > > -Ewen
> > >
> > >
> > > On Wed, Apr 22, 2015 at 12:29 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com
> > > > wrote:
> > >
> > > > Hi Ewen,
> > > >
> > > > Only time I can think of where Application needs to know result of 
> > > > offset
> > > > was committed or not during graceful shutdown and/or
> > > > Runtime.addShutdownHook() so consumer application does not get 
> > > > duplicated
> > > > records upon restart or does not have to deal with eliminating  already
> > > > process offset.  Only thing that consumer application will have to 
> > > > handle
> > > > is after XX retry failure to commit offset.  Or would prefer application
> > > to
> > > > manage this last offset commit when offset can not be commit due to
> > > > failure, connection timeout or any other failure case ?
> > > >
> > > > Thanks,
> > > > Bhavesh
> > > >
> > > >
> > > >
> > > > On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > > >
> > > > > I second Guozhang's proposal. I do think we need the callback. The
> > > > current
> > > > > state is that for async commits you actually don't know if it
> > > succeeded.
> > > > > However there is a totally valid case where you do need to know if it
> > > > > succeeded but don't need to block, and without the callback you are
> > > > stuck.
> > > > > I think the futures will likely cause problems since blocking on the
> > > > future
> > > > > precludes polling which would allow it to complete.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang <wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Ewen,
> > > > > >
> > > > > > I share the same concern you have about 2), that with the new API
> > > sync
> > > > > > commit implementation is a bit awkward since we have a
> > > single-threaded
> > > > > > design in new consumer. The reason that we need to mute other nodes
> > > for
> > > > > > doing coordinator sync operations like join-group / offset commits /
> > > > > offset
> > > > > > fetches is to avoid long blocking due to possible "starvation" on
> > > > network
> > > > > > selector, so I think they need to be done still.
> > > > > >
> > > > > > On the other hand, I think users using the commit API will usually
> > > fall
> > > > > > into three categories:
> > > > > >
> > > > > > 1) I really care that the offsets to be committed before moving on 
> > > > > > to
> > > > > fetch
> > > > > > more data, so I will wait FOREVER for it to complete.
> > > > > >
> > > > > > 2) I do not really care about whether it succeeds or not, so just
> > > fire
> > > > > > "commit" and let's move on; if it fails it fails (and it will be
> > > > logged).
> > > > > >
> > > > > > 3) I care if it succeeds or not, but I do not want to wait
> > > > indefinitely;
> > > > > so
> > > > > > let me know if it does not finish within some timeout or failed 
> > > > > > (i.e.
> > > > > give
> > > > > > me the exceptions / error codes) and I will handle it.
> > > > > >
> > > > > > The current APIs does not handle case 3) above, which sits between
> > > > BLOCK
> > > > > > FOREVER and DO NOT CARE AT ALL, but most times people would not be
> > > very
> > > > > > explicit about the exact "timeout", but just knowing it is definite
> > > and
> > > > > > reasonably short is good enough. I think for this we probably do not
> > > > need
> > > > > > an extra timeout / retry settings, but rely on the general request
> > > > retry
> > > > > > settings; similarly we probably do not need "cancel".
> > > > > >
> > > > > > So I wonder if we can do a slightly different modification to API
> > > like
> > > > > > this:
> > > > > >
> > > > > > void commit(Map<TopicPartition, Long> offsets, CommitType type,
> > > > > > ConsumerCommitCallback callback);
> > > > > >
> > > > > > For case 1) people call "commit(offsets)" which will block forever
> > > > until
> > > > > it
> > > > > > succeeds;
> > > > > >
> > > > > > For case 2) people call "commit(offsets, async)" which will return
> > > > > > immediately, with not callback upon finishes;
> > > > > >
> > > > > > For case 3) people call "commit(offsets, async, callback)", and the
> > > > > > callback will be executed when it finishes or #.request retries has
> > > > > > exhausted.
> > > > > >
> > > > > > This API will make much smaller changes to the current
> > > implementations
> > > > as
> > > > > > well. Of course if we have a common scenario where users would 
> > > > > > really
> > > > > care
> > > > > > about the exact timeout for async commits, then Future may be a good
> > > > > > approach.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin
> > > > <j...@linkedin.com.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Ewen,
> > > > > > >
> > > > > > > This makes sense. People usually do not want to stop consuming 
> > > > > > > when
> > > > > > > committing offsets.
> > > > > > >
> > > > > > > One corner case about async commit with retries I am thinking is
> > > that
> > > > > it
> > > > > > > is possible that two offset commits interleave with each other and
> > > > that
> > > > > > > might create problem. Like you said maybe we can cancel the
> > > previous
> > > > > one.
> > > > > > >
> > > > > > > Another thing is that whether the future mechanism will only be
> > > > applied
> > > > > > to
> > > > > > > auto commit or it will also be used in manual commit? Because in
> > > new
> > > > > > > consumer we allow user to provide an offset map for offset commit.
> > > > > Simply
> > > > > > > canceling a previous pending offset commit does not seem to be
> > > ideal
> > > > in
> > > > > > > this case because the two commits could be for different
> > > partitions.
> > > > > > >
> > > > > > > Thanks.
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > >I'd like to get some feedback on changing the offset commit API 
> > > > > > > >in
> > > > the
> > > > > > new
> > > > > > > >consumer. Since this is user-facing API I wanted to make sure 
> > > > > > > >this
> > > > > gets
> > > > > > > >better visibility than the JIRA (
> > > > > > > >https://issues.apache.org/jira/browse/KAFKA-2123) might.
> > > > > > > >
> > > > > > > >The motivation is to make it possible to do async commits but be
> > > > able
> > > > > to
> > > > > > > >tell when the commit completes/fails. I'm suggesting changing the
> > > > API
> > > > > > from
> > > > > > > >
> > > > > > > >void commit(Map offsets, CommitType)
> > > > > > > >
> > > > > > > >to
> > > > > > > >
> > > > > > > >Future<Void> commit(Map<TopicPartition, Long> offsets,
> > > > > > > >ConsumerCommitCallback callback);
> > > > > > > >
> > > > > > > >which matches the approach used for the producer. The
> > > > > > > >ConsumerCommitCallback only has one method:
> > > > > > > >
> > > > > > > >public void onCompletion(Exception exception);
> > > > > > > >
> > > > > > > >This enables a few different use cases:
> > > > > > > >
> > > > > > > >* Blocking commit via Future.get(), and blocking with timeouts 
> > > > > > > >via
> > > > > > > >Future.get(long, TimeUnit)
> > > > > > > >* See exceptions via the future (see discussion of retries below)
> > > > > > > >* Callback-based notification so you can keep processing messages
> > > > and
> > > > > > only
> > > > > > > >take action if something goes wrong, takes too long, etc. This is
> > > > the
> > > > > > use
> > > > > > > >case that motivated
> > > > > > > >* Fire and forget commits via a shorthand commit() API and
> > > ignoring
> > > > > the
> > > > > > > >resulting future.
> > > > > > > >
> > > > > > > >One big difference between this and the producer API is that 
> > > > > > > >there
> > > > > isn't
> > > > > > > >any result (except maybe an exception) from commitOffsets. This
> > > > leads
> > > > > to
> > > > > > > >the somewhat awkward Future<Void> signature. I personally prefer
> > > > that
> > > > > to
> > > > > > > >the sync/async flag, especially since it also provides a
> > > > non-blocking
> > > > > > > >interface for checking whether the commit is complete.
> > > > > > > >
> > > > > > > >I posted a WIP patch to the JIRA. In the progress of making it I
> > > > > found a
> > > > > > > >few issues that might be worth discussing:
> > > > > > > >
> > > > > > > >1. Retries. In the old approach, this was trivial since it only
> > > > > applied
> > > > > > to
> > > > > > > >synchronous calls, so we could just loop until the request was
> > > > > > successful.
> > > > > > > >Do we want to start introducing a retries mechanism here, and
> > > should
> > > > > it
> > > > > > > >apply to all types of requests or are we going to end up with a
> > > > couple
> > > > > > of
> > > > > > > >different retry settings for specific cases, like offset commit.
> > > The
> > > > > WIP
> > > > > > > >patch allows errors to bubble up through the Future on the first
> > > > > > failure,
> > > > > > > >which right now can cause some tests to fail transiently (e.g.
> > > > > consumer
> > > > > > > >bounce test).
> > > > > > > >
> > > > > > > >I think some sort of retry mechanism, even if it's an internal
> > > > > constant
> > > > > > > >rather than configurable, is probably the right solution, but I
> > > want
> > > > > to
> > > > > > > >figure out how broadly they should apply. I think adding them 
> > > > > > > >only
> > > > for
> > > > > > > >offset commits isn't hard.
> > > > > > > >
> > > > > > > >2. The Future implementation is a bit weird because the consumer
> > > > > doesn't
> > > > > > > >have a dedicated IO thread. My only concern is that this could
> > > lead
> > > > to
> > > > > > > >some
> > > > > > > >unintuitive results based on the current implementation because
> > > the
> > > > > way
> > > > > > > >this works is to just run poll() in the thread calling
> > > Future.get(),
> > > > > but
> > > > > > > >it
> > > > > > > >mutes all non-coordinator nodes which means other processing is
> > > > > > > >effectively
> > > > > > > >paused. If you're processing offset commits in a separate thread
> > > > from
> > > > > > your
> > > > > > > >main consumer thread that's calling poll(), you might just end up
> > > > > > bocking
> > > > > > > >the main thread while waiting on the Future. Then again, I'm not
> > > > sure
> > > > > > the
> > > > > > > >other nodes really even need to be muted -- maybe Jay or Guozhang
> > > > have
> > > > > > > >ideas on this?
> > > > > > > >
> > > > > > > >3. Should the future be cancellable? This probably isn't hard to
> > > > > > > >implement,
> > > > > > > >but I'm not sure we should even bother. On the one hand it could
> > > be
> > > > > > nice,
> > > > > > > >especially if you have an old commit request that you want to
> > > > > superseded
> > > > > > > >by
> > > > > > > >a new one with updated offsets. On the other hand, if the request
> > > > has
> > > > > > > >already been sent out, cancelling it won't accomplish anything. I
> > > > > think
> > > > > > > >the
> > > > > > > >only case this is useful is when there are retries.
> > > > > > > >
> > > > > > > >Thoughts?
> > > > > > > >
> > > > > > > >--
> > > > > > > >Thanks,
> > > > > > > >Ewen
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> > 
> > 
> > 
> > -- 
> > -- Guozhang
> 
> -- 
> Joel

Reply via email to