I second Guozhang's proposal. I do think we need the callback. The current state is that for async commits you actually don't know if it succeeded. However there is a totally valid case where you do need to know if it succeeded but don't need to block, and without the callback you are stuck. I think the futures will likely cause problems since blocking on the future precludes polling which would allow it to complete.
-Jay On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Ewen, > > I share the same concern you have about 2), that with the new API sync > commit implementation is a bit awkward since we have a single-threaded > design in new consumer. The reason that we need to mute other nodes for > doing coordinator sync operations like join-group / offset commits / offset > fetches is to avoid long blocking due to possible "starvation" on network > selector, so I think they need to be done still. > > On the other hand, I think users using the commit API will usually fall > into three categories: > > 1) I really care that the offsets to be committed before moving on to fetch > more data, so I will wait FOREVER for it to complete. > > 2) I do not really care about whether it succeeds or not, so just fire > "commit" and let's move on; if it fails it fails (and it will be logged). > > 3) I care if it succeeds or not, but I do not want to wait indefinitely; so > let me know if it does not finish within some timeout or failed (i.e. give > me the exceptions / error codes) and I will handle it. > > The current APIs does not handle case 3) above, which sits between BLOCK > FOREVER and DO NOT CARE AT ALL, but most times people would not be very > explicit about the exact "timeout", but just knowing it is definite and > reasonably short is good enough. I think for this we probably do not need > an extra timeout / retry settings, but rely on the general request retry > settings; similarly we probably do not need "cancel". > > So I wonder if we can do a slightly different modification to API like > this: > > void commit(Map<TopicPartition, Long> offsets, CommitType type, > ConsumerCommitCallback callback); > > For case 1) people call "commit(offsets)" which will block forever until it > succeeds; > > For case 2) people call "commit(offsets, async)" which will return > immediately, with not callback upon finishes; > > For case 3) people call "commit(offsets, async, callback)", and the > callback will be executed when it finishes or #.request retries has > exhausted. > > This API will make much smaller changes to the current implementations as > well. Of course if we have a common scenario where users would really care > about the exact timeout for async commits, then Future may be a good > approach. > > Guozhang > > > On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > Hey Ewen, > > > > This makes sense. People usually do not want to stop consuming when > > committing offsets. > > > > One corner case about async commit with retries I am thinking is that it > > is possible that two offset commits interleave with each other and that > > might create problem. Like you said maybe we can cancel the previous one. > > > > Another thing is that whether the future mechanism will only be applied > to > > auto commit or it will also be used in manual commit? Because in new > > consumer we allow user to provide an offset map for offset commit. Simply > > canceling a previous pending offset commit does not seem to be ideal in > > this case because the two commits could be for different partitions. > > > > Thanks. > > > > Jiangjie (Becket) Qin > > > > On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: > > > > >I'd like to get some feedback on changing the offset commit API in the > new > > >consumer. Since this is user-facing API I wanted to make sure this gets > > >better visibility than the JIRA ( > > >https://issues.apache.org/jira/browse/KAFKA-2123) might. > > > > > >The motivation is to make it possible to do async commits but be able to > > >tell when the commit completes/fails. I'm suggesting changing the API > from > > > > > >void commit(Map offsets, CommitType) > > > > > >to > > > > > >Future<Void> commit(Map<TopicPartition, Long> offsets, > > >ConsumerCommitCallback callback); > > > > > >which matches the approach used for the producer. The > > >ConsumerCommitCallback only has one method: > > > > > >public void onCompletion(Exception exception); > > > > > >This enables a few different use cases: > > > > > >* Blocking commit via Future.get(), and blocking with timeouts via > > >Future.get(long, TimeUnit) > > >* See exceptions via the future (see discussion of retries below) > > >* Callback-based notification so you can keep processing messages and > only > > >take action if something goes wrong, takes too long, etc. This is the > use > > >case that motivated > > >* Fire and forget commits via a shorthand commit() API and ignoring the > > >resulting future. > > > > > >One big difference between this and the producer API is that there isn't > > >any result (except maybe an exception) from commitOffsets. This leads to > > >the somewhat awkward Future<Void> signature. I personally prefer that to > > >the sync/async flag, especially since it also provides a non-blocking > > >interface for checking whether the commit is complete. > > > > > >I posted a WIP patch to the JIRA. In the progress of making it I found a > > >few issues that might be worth discussing: > > > > > >1. Retries. In the old approach, this was trivial since it only applied > to > > >synchronous calls, so we could just loop until the request was > successful. > > >Do we want to start introducing a retries mechanism here, and should it > > >apply to all types of requests or are we going to end up with a couple > of > > >different retry settings for specific cases, like offset commit. The WIP > > >patch allows errors to bubble up through the Future on the first > failure, > > >which right now can cause some tests to fail transiently (e.g. consumer > > >bounce test). > > > > > >I think some sort of retry mechanism, even if it's an internal constant > > >rather than configurable, is probably the right solution, but I want to > > >figure out how broadly they should apply. I think adding them only for > > >offset commits isn't hard. > > > > > >2. The Future implementation is a bit weird because the consumer doesn't > > >have a dedicated IO thread. My only concern is that this could lead to > > >some > > >unintuitive results based on the current implementation because the way > > >this works is to just run poll() in the thread calling Future.get(), but > > >it > > >mutes all non-coordinator nodes which means other processing is > > >effectively > > >paused. If you're processing offset commits in a separate thread from > your > > >main consumer thread that's calling poll(), you might just end up > bocking > > >the main thread while waiting on the Future. Then again, I'm not sure > the > > >other nodes really even need to be muted -- maybe Jay or Guozhang have > > >ideas on this? > > > > > >3. Should the future be cancellable? This probably isn't hard to > > >implement, > > >but I'm not sure we should even bother. On the one hand it could be > nice, > > >especially if you have an old commit request that you want to superseded > > >by > > >a new one with updated offsets. On the other hand, if the request has > > >already been sent out, cancelling it won't accomplish anything. I think > > >the > > >only case this is useful is when there are retries. > > > > > >Thoughts? > > > > > >-- > > >Thanks, > > >Ewen > > > > > > > -- > -- Guozhang >