Okay, now I understand that the usefulness of a third rebalance timeout is only when users do not care about process timeout (i.e. they just make it infinity), but still want to let the rebalance to finish in a reasonable amount of time even some consumers are delayed in processing before their next "poll".
And I also agree with your previous comment that for this case, users still need to have some knowledge about the worst case processing latency of a batch; and since they need to learn that anyways, why not they just set the processing time accordingly. So this extra config may not be that useful in terms of “incremental benefits”. So I am still in favor of keeping the current KIP as is. Grant, Onur: do you have any other thoughts? If not I would suggest Jason to start the voting thread some time end of this week. Guozhang On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Guozhang, > > I'm actually not too concerned about the time spent in the rebalance > callback specifically. Both it and regular processing time in the poll loop > will delay the rebalance and keep joined consumers idle. However, if we > expose the rebalance timeout, then it would give users the option to > effective disable the process timeout while still keeping a maximum bound > on the rebalance time. If the consumer cannot complete its processing fast > enough and rejoin, then it would be evicted. This provides something like > (2) since the other consumers in the group would be able to complete the > rebalance and resume work while the evicted consumer would have to rollback > progress. This is not too different from rebalancing in the background > which also typically would cause commit failure and rollback (though at > least the consumer stays in the group). > > Now that I'm thinking about it more, I'm not sure this would be a great > facility to depend on in practice. It might be OK if just one or two of the > consumers fall out of the group during the rebalance, but if half the group > is regularly getting evicted, it would be a problem. So even if we expose > the rebalance timeout, the user is still going to have to set it with some > idea in mind about how long processing should take. > > Thanks, > Jason > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Jason, > > > > With the current usage pattern of: > > > > while(..) { > > > > consumer.poll(/* where rebalance happens */) > > > > // process messages > > } > > > > ---------- > > > > And since rebalance is till on the caller thread, not the background > > thread, if coordinator decides to rebalance while user thread is still on > > processing messages, there is no options but we are forced to go with 1) > > right? I think the your / Onur's point here, which I agree, is that by > > reusing process timeout as rebalance timeout, if the rebalance callback > > could take longer time than processing a batch, users need to set the > > timeout value to the higher of the two, hence the callback latency, which > > will make detection of processing stallness less effective, right? > > > > As I mentioned in my previous email, I feel that this case of "callback > > function time taking loner than processing a batch" would not be frequent > > in practice, and the processing timeout would usually be a good higher > > bound on the callback function latency. If that is true, I'd suggest we > > keep the current proposal and not add a third timeout config for covering > > this case. > > > > > > Guozhang > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Guozhang, > > > > > > I think the problem is that users may not want to sacrifice rebalance > > > latency because of uncertainty around processing time. As soon as a > > > rebalance begins, there are basically two choices: > > > > > > 1. Block the rebalance until all consumers have finished their current > > > processing. > > > 2. Let all consumers rebalance and "rollback" any processing that could > > not > > > be committed before the rebalance completes. > > > > > > If you choose option (1), then you have an incentive to keep a > relatively > > > tight bound on process.timeout.ms in order to reduce the worst-case > idle > > > time during a rebalance. But if you fail to set it high enough, then > > you'll > > > get spurious rebalances during normal processing. I think Onur is > saying > > > that this still sort of sucks for users. On the other hand, if (2) is > > > acceptable, then users will have more freedom to err on the high side > > when > > > setting process.timeout.ms, or even disable it entirely. They will > have > > to > > > deal with rolling back any progress which cannot be committed after the > > > rebalance completes, but maybe this is less of a problem for some > users? > > > > > > Thanks, > > > Jason > > > > > > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hi Onur, Jason: > > > > > > > > Here are some thoughts about reusing process timeout as server-side > > > > rebalance timeout: First of all, my understanding is that > > > > > > > > 1) session timeout is for detecting consumer crash / hard failures > (in > > > this > > > > case the heartbeat thread will be dead as well, hence coordinator > > > realized > > > > within session timeout value). > > > > > > > > 2) process timeout is for checking liveness of the user thread that > > calls > > > > the consumer as well as does the processing: when no consumer calls > are > > > > made within the process timeout, heartbeat thread stop working and > > hence > > > it > > > > will be detected by coordinator. > > > > > > > > 3) a potential server-side rebalance timeout would be used to detect > > > > consumer liveness during the rebalance period, in which the user > thread > > > is > > > > tied with the "poll" call and also the callback function, to prevent > a > > > slow > > > > / stalled consumer in their rebalance callback to cause the rebalance > > > > taking forever. > > > > > > > > I think we generally have two cases in practice regarding 3) above: > > user > > > > either does almost nothing and hence should never be stalled (unless > > > there > > > > is a long GC), or they do various external IOs for maintaining their > > own > > > > states, for example, which could be taking long or even cause the > > thread > > > to > > > > stall. We do not need to worry too much about the former case, and as > > for > > > > latter case if the process timeout value should usually be a good > > higher > > > > bound on the rebalance latency. > > > > > > > > That being said, if we observe that there is indeed a common usage > > where > > > 2) > > > > and 3) would require very different timeout values which overwhelms > the > > > > complexity of three timeout values, we can consider adding a third > one > > > > then: it is easier to add more configs later. > > > > > > > > > > > > What do you think? > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <ja...@confluent.io > > > > > > wrote: > > > > > > > > > Hey Onur, > > > > > > > > > > Thanks for the detailed response. I think the problem of > controlling > > > > > rebalance times is the main (known) gap in the proposal as it > stands. > > > > > > > > > > This burden goes away if you loosen the liveness property by > having a > > > > > > required rebalance time and optional processing time where > > rebalance > > > > > > happens in the background thread as stated in the KIP. > > > > > > > > > > > > > > > Just to clarify, the current KIP only allows rebalances to complete > > in > > > > the > > > > > foreground. When I suggested above in reply to Grant was that we > > could > > > > add > > > > > a separate rebalance timeout setting, the behavior I had in mind > was > > to > > > > let > > > > > the consumer fall out of the group if the timeout is reached while > > the > > > > > consumer is still processing. I was specifically trying to avoid > > moving > > > > the > > > > > rebalance to the background thread since this significantly > increases > > > the > > > > > complexity of the implementation. We'd also have to think about > > > > > compatibility a bit more. For example, what are the implications of > > > > having > > > > > the rebalance listener execute in a separate thread? > > > > > > > > > > Putting that issue aside, I think we need to convince ourselves > that > > a > > > > > separate rebalance timeout is really necessary since every new > > timeout > > > > adds > > > > > some conceptual noise which all users will see. My thought in this > > KIP > > > > was > > > > > that users who didn't want the burden of tuning the process timeout > > > could > > > > > use a relatively large value without a major impact because group > > > > > rebalances themselves will typically be infrequent. The main > concern > > is > > > > for > > > > > users who have highly variant processing times and want to ensure a > > > tight > > > > > bound on rebalance times (even if it means having to discard some > > > > > processing that cannot be completed before the rebalance finishes). > > > These > > > > > users will be left trying to tune process.timeout.ms and > > > > max.poll.records, > > > > > which is basically the same position they are currently in. The > > problem > > > > is > > > > > I don't know how common this case is, so I'm not sure how it weighs > > > > against > > > > > the cost of having an additional timeout that needs to be > explained. > > We > > > > can > > > > > always add the rebalance timeout later, but if it will be tough to > > > remove > > > > > once it's there. All the same, I'm not that keen on another > iteration > > > of > > > > > this problem, so if we believe this use case is common enough, then > > > maybe > > > > > we should add it now. > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman < > > > > > onurkaraman.apa...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks for the KIP writeup, Jason. > > > > > > > > > > > > Before anything else, I just wanted to point out that it's worth > > > > > mentioning > > > > > > the "heartbeat.interval.ms" consumer config in the KIP for > > > > completeness. > > > > > > Today this config only starts to kick in if poll is called > > frequently > > > > > > enough. A separate heartbeat thread should make this config > behave > > > more > > > > > > like what people would expect: a separate thread sending > heartbeats > > > at > > > > > the > > > > > > configured interval. > > > > > > > > > > > > With this KIP, the relevant configs become: > > > > > > "max.poll.records" - already exists > > > > > > "session.timeout.ms" - already exists > > > > > > "heartbeat.interval.ms" - already exists > > > > > > "process.timeout.ms" - new > > > > > > > > > > > > After reading the KIP several times, I think it would be helpful > to > > > be > > > > > more > > > > > > explicit in the desired outcome. Is it trying to make faster > > > > > > best/average/worst case rebalance times? Is it trying to make the > > > > clients > > > > > > need less configuration tuning? > > > > > > > > > > > > Also it seems that brokers probably still want to enforce minimum > > and > > > > > > maximum rebalance timeouts just as with the minimum and maximum > > > session > > > > > > timeouts so DelayedJoins don't stay in purgatory indefinitely. So > > > we'd > > > > > add > > > > > > new "group.min.rebalance.timeout.ms" and " > > > > group.max.rebalance.timeout.ms > > > > > " > > > > > > broker configs which again might need to be brought up in the > KIP. > > > > Let's > > > > > > say we add these bounds. A side-effect of having broker-side > bounds > > > on > > > > > > rebalance timeouts in combination with Java clients that makes > > > process > > > > > > timeouts the same as rebalance timeouts is that the broker > > > effectively > > > > > > dictates the max processing time allowed between poll calls. This > > > > gotcha > > > > > > exists right now with today's broker-side bounds on session > > timeouts. > > > > So > > > > > > I'm not really convinced that the proposal gets rid of this > > > > complication > > > > > > mentioned in the KIP. > > > > > > > > > > > > I think the main question to ask is: does the KIP actually make a > > > > > > difference? > > > > > > > > > > > > It looks like this KIP improves rebalance times specifically when > > the > > > > > > client currently has processing times large enough to force > larger > > > > > session > > > > > > timeouts and heartbeat intervals to not be honored. Separating > > > session > > > > > > timeouts from processing time means clients can keep their " > > > > > > session.timeout.ms" low so the coordinator can quickly detect > > > process > > > > > > failure, and honoring a low "heartbeat.interval.ms" on the > > separate > > > > > > heartbeat thread means clients will be quickly notified of group > > > > > membership > > > > > > and subscription changes - all without placing difficult > > expectations > > > > on > > > > > > processing time. But even so, rebalancing through the calling > > thread > > > > > means > > > > > > the slowest processing client in the group will still be the rate > > > > > limiting > > > > > > step when looking at rebalance times. > > > > > > > > > > > > From a usability perspective, the burden still seems like it will > > be > > > > > tuning > > > > > > the processing time to keep the "progress liveness" happy during > > > > > rebalances > > > > > > while still having reasonable upper bounds on rebalance times. It > > > still > > > > > > looks like users have to do almost the exact same tricks as today > > > when > > > > > the > > > > > > group membership changes due slow processing times even though > all > > > the > > > > > > consumers are alive and the topics haven't change: > > > > > > 1. Increase the rebalance timeout to give more time for record > > > > processing > > > > > > (the difference compared to today is that we bump the rebalance > > > timeout > > > > > > instead of session timeout). > > > > > > 2. Reduce the number of records handled on each iteration with > > > > > > max.poll.records. > > > > > > > > > > > > This burden goes away if you loosen the liveness property by > > having a > > > > > > required rebalance time and optional processing time where > > rebalance > > > > > > happens in the background thread as stated in the KIP. > > > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson < > > > ja...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > Hey Grant, > > > > > > > > > > > > > > Thanks for the feedback. I'm definitely open to including > > > heartbeat() > > > > > in > > > > > > > this KIP. One thing we should be clear about is what the > behavior > > > of > > > > > > > heartbeat() should be when the group begins rebalancing. I > think > > > > there > > > > > > are > > > > > > > basically two options: > > > > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if the group has > > > > started > > > > > > > rebalancing. > > > > > > > 2. heartbeat() completes the rebalance itself. > > > > > > > > > > > > > > With the first option, when processing takes longer than the > > > > rebalance > > > > > > > timeout, the member will fall out of the group which will cause > > an > > > > > offset > > > > > > > commit failure when it finally finishes. However, if processing > > > > > finishes > > > > > > > before the rebalance completes, then offsets can still be > > > committed. > > > > On > > > > > > the > > > > > > > other hand, if heartbeat() completes the rebalance itself, then > > > > you'll > > > > > > > definitely see the offset commit failure for any records being > > > > > processed. > > > > > > > So the first option is sort of biased toward processing > > completion > > > > > while > > > > > > > the latter is biased toward rebalance completion. > > > > > > > > > > > > > > I'm definitely not a fan of second option since it takes away > the > > > > > choice > > > > > > to > > > > > > > finish processing before rejoining. However, I do see some > > benefit > > > in > > > > > the > > > > > > > first option if the user wants to keep rebalance time low and > > > doesn't > > > > > > mind > > > > > > > being kicked out of the group if processing takes longer > during a > > > > > > > rebalance. This may be a reasonable tradeoff since consumer > > groups > > > > are > > > > > > > presumed to be stable most of the time. A better option in that > > > case > > > > > > might > > > > > > > be to expose the rebalance timeout to the user directly since > it > > > > would > > > > > > > allow the user to use an essentially unbounded > > process.timeout.ms > > > > for > > > > > > > highly variant processing while still keeping rebalance time > > > limited. > > > > > Of > > > > > > > course, it would be another timeout for the user to > understand... > > > > > > > > > > > > > > Thanks, > > > > > > > Jason > > > > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke < > > ghe...@cloudera.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > > > Thanks for writing up a proposal (and a thorough one)! This > is > > > > > > something > > > > > > > > that I had been thinking about this week too as I have run > into > > > it > > > > > more > > > > > > > > than a handful of times now. > > > > > > > > > > > > > > > > I like the idea of having a larger processing timeout, that > > > timeout > > > > > in > > > > > > > > unison with max.poll.records should in many cases provide a > > > > > reasonable > > > > > > > > assurance that the consumer will stay alive. > > > > > > > > > > > > > > > > In rejected alternatives "Add a separate API the user can > call > > to > > > > > > > indicate > > > > > > > > liveness" is listed. I think a heartbeat api could be added > > along > > > > > with > > > > > > > > these new timeout configurations and used for "advanced" use > > > cases > > > > > > where > > > > > > > > the processing time could be highly variant and less > > > predictable. I > > > > > > > think a > > > > > > > > place where we might use the heartbeat api in Kafka is > > > MirrorMaker. > > > > > > > > > > > > > > > > Today, I have seen people trying to find ways to leverage the > > > > > existing > > > > > > > api > > > > > > > > to "force" heartbeats by: > > > > > > > > > > > > > > > > 1. Calling poll to get the batch of records to process > > > > > > > > 2. Call pause on all partitions > > > > > > > > 3. Process the record batch > > > > > > > > 3a. While processing periodically call poll (which is > > essentially > > > > > just > > > > > > > > heartbeat since it returns no records and is paused) > > > > > > > > 4. Commit offsets and un-pause > > > > > > > > 5. Repeat from 1 > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Grant > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson < > > > > ja...@confluent.io > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > One of the persistent problems we see with the new consumer > > is > > > > the > > > > > > use > > > > > > > of > > > > > > > > > the session timeout in order to ensure progress. Whenever > > there > > > > is > > > > > a > > > > > > > > delay > > > > > > > > > in message processing which exceeds the session timeout, no > > > > > > heartbeats > > > > > > > > can > > > > > > > > > be sent and the consumer is removed from the group. We seem > > to > > > > hit > > > > > > this > > > > > > > > > problem everywhere the consumer is used (including Kafka > > > Connect > > > > > and > > > > > > > > Kafka > > > > > > > > > Streams) and we don't always have a great solution. I've > > > written > > > > a > > > > > > KIP > > > > > > > to > > > > > > > > > address this problem here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > > > > > > > . > > > > > > > > > Have a look and let me know what you think. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Grant Henke > > > > > > > > Software Engineer | Cloudera > > > > > > > > gr...@cloudera.com | twitter.com/gchenke | > > > > > linkedin.com/in/granthenke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang