Just a minor correction, but #2 is KAFKA-2168 and #3 is KAFKA-2123. For #1, I think there should be some minimal effort to making the poll respect the timeout (with the understanding that spurious wakeups can happen). I think this really just means calling NetworkClient.poll() in a loop and checking if there are any results to return. An additional complication is that long poll timeouts can prevent heartbeats or auto-committed offsets from being sent. This means that the maximum time that the consumer should actually block in NetworkClient.poll() should be the minimum of the poll timeout, the heartbeat interval, and the auto-commit interval (if it's set). I attempted to handle this in my patch for KAFKA-2168, which is pending review.
I tend to agree with Jay on 1b though. Trying to leave the JoinGroup pending could get tricky. On Tue, Jun 9, 2015 at 4:25 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > My two cents: > > Overall I think our focus as far as extensions go should be on (1) things > which would be incompatible if changed later and (2) operationalization. > There are lots of new apis that could be nice to have, but I think if > adding them later will just be an api addition we should hold off and get > out what we have, if it will be ugly to add later then let's try to get it > in now. > > 1a. I don't think there is too much value in attempting to avoid spurious > wakeups. Several people have asked about this but I think the only usage > that makes sense is in an event loop--since you can always get 0 events due > to a timeout. I think we should just document that the timeout is just > there for guidance and not strictly enforced. I think this is intuitive > since no timeout like this is ever really strictly enforced (if there is a > 30 second gc pause we will be off by 30 seconds regardless of how diligent > in our own code) > 1b. Same here. I think we should just document this. Trying to return in > the middle of a joinGroup will be very complex so I think we should just > document this. > Also: The docs currently say that a timeout of 0 blocks forever but I think > that might be wrong. Regardless I think for consistency a timeout of 0 > should be non-blocking and a timeout of MAX_INT should be used to block > "forever". > > 2. Here is my understanding of this one. I think on this there was > originally some interest in tightening up the locking to allow more > parallelism in consumer usage. Personally I think this adds a lot of > complexity and would prefer to avoid it. My concern is primarily around > implementation complexity--I think without a very careful, well abstracted > threading model a big chunk of code with locks of adhoc locking, even if > perfect when initially written is just very hard to maintain. I also think > the single threaded usage pattern is also easier for the end-user and > likely faster, though there are some downsides. After some discussion I > think there was a second proposal to instead leave the locking as is and > add a wakeup call. The use case for this would be something like quickly > interrupting and shutting down a thread that is in it's event loop. I think > this makes sense though we could probably release a v1 without it if need > be. I think an open question here is whether interrupt needs to work for > all calls (e.g. commit) or just poll, with other calls having a defacto > timeout from the request timeout and retry count. I would vote for the > later if it helps with implementation simplicity. > > 3. I think both these extensions make sense. It would be nice to get them > into the first release to avoid api churn. > > 4. Not sure I fully understand. > > 5a. The rationale for doing seekToBeginning and seekToEnd was that we felt > that we might need to think a little bit about the offset list api a bit > more before knowing how best to expose it. We clearly need something here > but the hope was the two seekTo calls were good enough to get started and > we could wait to think it out properly to add the more general call. I > think the thinking was basically (a) the time mechanism for the > intermediate segments is often wrong, (b) the protocol that does the fetch > is quite ad hoc and perhaps there is more per-segment info that should be > in that request, and (c) the api in simple consumer is very unintuitive. If > we are adding an end-state solution > > 5b. Not sure but can we add it later? How would mm work? > > > On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > This email is to kick-off some discussion around the changes we want to > > make on the new consumer APIs as well as their semantics. Here are a > > not-comprehensive list of items in my mind: > > > > 1. Poll(timeout): current definition of timeout states "The time, in > > milliseconds, spent waiting in poll if data is not available. If 0, waits > > indefinitely." While in the current implementation, we have different > > semantics as stated, for example: > > > > a) poll(timeout) can return before "timeout" elapsed with empty consumed > > data. > > b) poll(timeout) can return after more than "timeout" elapsed due to > > blocking event like join-group, coordinator discovery, etc. > > > > We should think a bit more on what semantics we really want to provide > and > > how to provide it in implementation. > > > > 2. Thread safeness: currently we have a coarsen-grained locking mechanism > > that provides thread safeness but blocks commit / position / etc calls > > while poll() is in process. We are considering to remove the > > coarsen-grained locking with an additional Consumer.wakeup() call to > break > > the polling, and instead suggest users to have one consumer client per > > thread, which aligns with the design of a single-threaded consumer > > (KAFKA-2123). > > > > 3. Commit(): we want to improve the async commit calls to add a callback > > handler upon commit completes, and guarantee ordering of commit calls > with > > retry policies (KAFKA-2168). In addition, we want to extend the API to > > expose attaching / fetching offset metadata stored in the Kafka offset > > manager. > > > > 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we > check > > the generation id and the assigned partitions before accepting the > request > > if the group is using Kafka for partition management, but for > > OffsetFetchRequest we cannot do this checking since it does not include > > groupId / consumerId information. Do people think this is OK or we should > > add this as we did in OffsetCommitRequest? > > > > 5. New APIs: there are some other requests to add: > > > > a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning > > sufficient? > > > > b) listTopics(): or should we just enforce users to use AdminUtils for > such > > operations? > > > > There may be other issues that I have missed here, so folks just bring it > > up if you thought about anything else. > > > > -- Guozhang > > >