Guozhang and Jason, I think we are on the same page that having rebalances done in the background thread has a much bigger impact to the users. So I agree that is is probably better to start with having 1) and 2). We can add 3) later if necessary.
Another implementation detail I am not quite sure is about making the NetworkClient work with two threads. The KIP implies that this will be done by synchronizing on ConsumerNetworkClient. I am not sure if that is enough, what if a poll() from ConsumerNetworkClient receives a FetchResponse or OffsetFetchResponse which are supposed to be handled by user thread? This is implementation detail but may be worth thinking about a bit more. Thanks, Jiangjie (Becket) Qin On Mon, Jun 6, 2016 at 11:27 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Jiangjie: > > About doing the rebalance in the background thread, I'm a bit concerned as > it will change a lot of the concurrency guarantees that consumer currently > provides (think of a consumer caller thread committing externally while the > rebalance is happening in the background thread), and hence if we are > considering changing that now or in the future, we need to think through > all the corner cases. > > So in general, I'd still prefer we reserve a third config for rebalance > timeout in this KIP. > > Guozhang > > > On Mon, Jun 6, 2016 at 11:25 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > (+ Matthias) > > > > Hello Henry, > > > > Specifically to your question regarding Kafka Streams: > > > > 1. Currently restoreActiveState() is triggered in the onPartitionAssigned > > callback, which is after the rebalance is completed from the > coordinator's > > point of view, and hence is covered in the process timeout value in this > > new KIP. > > > > 2. That is a good question, and I think it is a general root cause we saw > > failures of directory locking reported by more than one use case already. > > Currently I believe the main reason that a second rebalance is triggered > > while the processors are still completing restoreActiveState() of the > > previous rebalance is due to session timeout (default 30 seconds), which > > will be largely reduced with a larger processor timeout; however with > > complex topologies we restoreActiveState() for all states may still be > > taking long time with tens / hundreds of state stores, and other cases > > that also can cause consumers to re-join the groups right after a > previous > > rebalance, for example 1) regex subscription where the topic metadata has > > changed, 2) consecutive consumer failures, or new consumers (i.e. new > > KStream instances / threads) added. > > > > For such cases we can do a better job to "fail fast" if the consumer > > detects another join is needed. I think in one of your local commit you > > are already doing sth similar, which we can merge back to trunk. > > > > > > > > Guozhang > > > > > > On Sun, Jun 5, 2016 at 11:24 PM, Henry Cai <h...@pinterest.com.invalid> > > wrote: > > > >> I have a question on the KIP on long stall during > >> ProcessorStateManager.restoreActiveState(), this can be a long stall > when > >> we need to rebuild the RocksDB state on a new node. > >> > >> 1. Is restoreActiveState() considered as post rebalance since this is > >> invoked on application rebalance listener? > >> 2. When the node A was spending long time rebuilding the state in > >> restoreActiveState() from the previous rebalance, a new node (node B) > send > >> a new JoinGroup request to the co-ordinator, how long should the > >> coordinator wait for node A to finish the restoreActiveState from the > >> previous rebalance, the restoreActiveState can take more than 10 minutes > >> for a big state. > >> > >> > >> On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin <becket....@gmail.com> > wrote: > >> > >> > Hi Jason, > >> > > >> > Thanks for this very useful KIP. In general I am with Guozhang on the > >> > purpose of of the three timeouts. > >> > 1) session timeout for consumer liveness, > >> > 2) process timeout (or maybe we should rename it to > >> max.poll.interval.ms) > >> > for application liveness, > >> > 3) rebalance timeout for faster rebalance in some failure cases. > >> > > >> > It seems the current discussion is mainly about whether we need 3) as > a > >> > separate timeout or not. The current KIP proposal is to combine 2) and > >> 3), > >> > i.e. just use process timeout as rebalance timeout. That means we need > >> to > >> > either increase rebalance timeout out to let it adapt to process > >> timeout, > >> > or the reverse. It would be helpful to understand the impact of these > >> two > >> > cases. Here are my two cents. > >> > > >> > For users who are consuming data from Kafka, usually they either care > >> about > >> > throughput or care about latency. > >> > > >> > If users care about the latency, they would probably care more about > >> > average latency instead of 99.99 percentile latency which can be > >> affected > >> > by many other more common reasons other than consumer failure. Because > >> all > >> > the timeout we are discussing here only have impact on the 99.99 > >> percentile > >> > latency, I don't think it would really make a difference for latency > >> > sensitive users. > >> > > >> > The majority of the use cases for Kafka Connect and Mirror Maker are > >> > throughput sensitive. Ewen raised a good example where Kafka Connect > >> needs > >> > to process the previous data on rebalance therefore requires a higher > >> > rebalance timeout than process timeout. This is essentially the same > in > >> > Mirror Maker, where each rebalance needs to flush all the messages in > >> the > >> > accumulator in the producer. That could take some time depending on > how > >> > many messages are there. In this case, we may need to increase the > >> process > >> > timeout to make it the same as rebalance timeout. But this is probably > >> > fine. The downside of increasing process timeout is a longer detection > >> time > >> > of a consumer failure. Detecting a consumer failure a little later > only > >> > has limited impact because the rest of the consumers in the same group > >> are > >> > still working fine. So the total throughput is unlikely to drop > >> > significantly. As long as the rebalance is not taking longer it should > >> be > >> > fine. The reason we care more about how fast rebalance can finish is > >> > because during rebalance no consumer in the group is consuming, i.e. > >> > throughput is zero. So we want to make the rebalance finish as quickly > >> as > >> > possible. > >> > > >> > Compare with increasing process timeout to rebalance timeout, it > seems a > >> > more common case where user wants a longer process timeout, but > smaller > >> > rebalance timeout. I am more worried about this case where we have to > >> > shoehorn the rebalance timeout into process timeout. For users care > >> about > >> > throughput, that might cause the rebalance to take unnecessarily > longer. > >> > Admittedly this only has impact when a consumer had problem during > >> > rebalance, but depending on how long the process timeout was set, the > >> > rebalance could potentially take forever like Guozhang mentioned. > >> > > >> > I agree with Guozhang that we can start with 1) and 2) and add 3) > later > >> if > >> > needed. But adding rebalance timeout is more involved than just > adding a > >> > configuration. That also means the rebalance has to be done in the > >> > background heartbeat thread. Hence we have to synchronize rebalance > and > >> > consumer.poll() like we did in old consumer. Otherwise user may lose > >> > messages if auto commit is enabled, or the manual commit might fail > >> after a > >> > consumer.poll() because the partitions might have been reassigned. So > >> > having a separate rebalance timeout also potentially means a big > change > >> to > >> > the users as well. > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > > >> > > >> > On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <ja...@confluent.io> > >> > wrote: > >> > > >> > > Hey Ewen, > >> > > > >> > > I confess your comments caught me off guard. It never occurred to me > >> that > >> > > anyone would ask for a rebalance timeout so that it could be set > >> _larger_ > >> > > than the process timeout. Even with buffered or batch processing, I > >> would > >> > > usually expect flushing before a rebalance to take no more time > than a > >> > > periodic flush. Otherwise, I'd probably try to see if there was some > >> > > workload I could push into periodic flushes so that rebalances could > >> > > complete faster. But supposing this isn't possible or practical in > >> some > >> > > cases, I'm wondering how limiting it would be in practice to have > only > >> > the > >> > > one timeout in this case? I'm a little reluctant to add the > additional > >> > > timeout since I think most users would not have a strong need to > keep > >> a > >> > > tight bound on normal processing time. (I'm also reminded that Jay > >> > > mentioned he might have to dock everyone's pay 5% for each new > >> timeout we > >> > > introduce ;-) > >> > > > >> > > Thanks, > >> > > Jason > >> > > > >> > > > >> > > > >> > > > >> > > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <wangg...@gmail.com> > >> > wrote: > >> > > > >> > > > Hi Ewen, > >> > > > > >> > > > I think you are right, the rebalance process could potentially > >> involve > >> > > all > >> > > > the delayed compute / IO. More specifically, this is what I think > of > >> > the > >> > > > rebalance process: > >> > > > > >> > > > 1. Coordinator decides to rebalance, start ticking based on > >> rebalance > >> > > time > >> > > > out. > >> > > > 2. Consumer realize rebalance needed when calling poll(); trigger > >> > > > onPartitionRevoked(). > >> > > > 3. Consumer sends JoinGroupRequest; > >> > > > 4. Coordinator send JoinGroupResponse; start ticking on the > leader. > >> > > > 5. Leader compute and send SyncGroupRequest > >> > > > 6. Coordinator send SyncGroupResponse; start ticking on session > >> > timeout. > >> > > > 7. Consumer get new assignment; trigger onPartitionAssigned(). > >> > > > > >> > > > In the above process: delayed compute / IO is usually done at step > >> 2); > >> > > > workload initialization is usually done in step 7); and some admin > >> work > >> > > > (like in Kafka Streams) are likely to be done in step 5). As in > the > >> > > current > >> > > > KIP proposal the rebalance timeout on the coordinator start > ticking > >> on > >> > 1) > >> > > > on everyone in the group, and stop ticking on 3); it start ticking > >> on > >> > > > leader again on step 4), and stop upon step 5). In this case the > >> > delayed > >> > > > compute / IO contained in step 2) is covered by this rebalance > >> timeout. > >> > > > > >> > > > That being said, I think for "worst case", the time of processing > a > >> > > single > >> > > > record would still be similar to rebalancing, since both of which > >> could > >> > > > result in completing all delayed compute / IO so far. And since > >> > > "processing > >> > > > timeout" is used to cover the worst case, it should be still OK? > >> > > > > >> > > > > >> > > > Guozhang > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava < > >> > e...@confluent.io > >> > > > > >> > > > wrote: > >> > > > > >> > > > > Jason, > >> > > > > > >> > > > > I've been thinking about this more in terms of something like > >> > Connect. > >> > > I > >> > > > > think the rebalance timeout may be a bit different from the > >> process > >> > > > > timeout, and even the process timeout is a bit of a misnomer. > >> > > > > > >> > > > > We sort of talk about the process timeout as if it can be an > >> > indicator > >> > > of > >> > > > > maximum processing time for a record/batch. This makes sense > for a > >> > case > >> > > > of > >> > > > > a data-dependent load (i.e. you can only load some data from > slow > >> > > storage > >> > > > > after seeing some data) where that load might be very large > >> compared > >> > to > >> > > > > normal processing time. It also makes sense if you have auto > >> commit > >> > > > enabled > >> > > > > because you need to be completely finished processing the data > >> before > >> > > > > calling poll() again, so that time before you call another > >> consumer > >> > API > >> > > > > actually reflects processing time. > >> > > > > > >> > > > > It might makes less sense in cases like streams (or any other > app) > >> > that > >> > > > > batch writes to disk, or connectors that "process" a message by > >> > > enqueuing > >> > > > > the data, but won't commit offsets until data is flushed, > possibly > >> > > during > >> > > > > some other, much later iteration of processing. In this case I > >> think > >> > > > > processing time and rebalance time could potentially differ > >> > > > significantly. > >> > > > > During normal processing, you can potentially pipeline quite a > >> bit, > >> > > > > buffering up changes, flushing as needed, but then only > committing > >> > once > >> > > > > flushing is complete. But rebalancing is different then -- you > >> *must* > >> > > > > finish flushing all the data or manually choose to discard the > >> data > >> > > > > (presumably by doing something like watching for the process > >> timeout > >> > > you > >> > > > > set and bailing early, only committing the offsets for data > you've > >> > > > > flushed). If you have lots of data built up, the cost for > >> rebalancing > >> > > > could > >> > > > > be a *lot* higher than the maximum time you would otherwise see > >> > between > >> > > > > calls to consumer APIs to indicate processing progress. > >> > > > > > >> > > > > The thing that makes these cases different is that processing > >> isn't > >> > > > > actually tied to calls to the consumer API. You can queue up / > >> > > pipeline / > >> > > > > defer some of the work. (By the way, this is currently a > >> limitation > >> > of > >> > > > sink > >> > > > > connectors that I'm not thrilled about -- offset commit > requires a > >> > full > >> > > > > flush, whereas some coordination with the sink connector to not > >> > > require a > >> > > > > full flush except on rebalances would be much nicer, albeit more > >> > > > difficult > >> > > > > for sink connectors to implement.) > >> > > > > > >> > > > > -Ewen > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson < > >> ja...@confluent.io> > >> > > > > wrote: > >> > > > > > >> > > > > > Hey Guozhang, > >> > > > > > > >> > > > > > I'm actually not too concerned about the time spent in the > >> > rebalance > >> > > > > > callback specifically. Both it and regular processing time in > >> the > >> > > poll > >> > > > > loop > >> > > > > > will delay the rebalance and keep joined consumers idle. > >> However, > >> > if > >> > > we > >> > > > > > expose the rebalance timeout, then it would give users the > >> option > >> > to > >> > > > > > effective disable the process timeout while still keeping a > >> maximum > >> > > > bound > >> > > > > > on the rebalance time. If the consumer cannot complete its > >> > processing > >> > > > > fast > >> > > > > > enough and rejoin, then it would be evicted. This provides > >> > something > >> > > > like > >> > > > > > (2) since the other consumers in the group would be able to > >> > complete > >> > > > the > >> > > > > > rebalance and resume work while the evicted consumer would > have > >> to > >> > > > > rollback > >> > > > > > progress. This is not too different from rebalancing in the > >> > > background > >> > > > > > which also typically would cause commit failure and rollback > >> > (though > >> > > at > >> > > > > > least the consumer stays in the group). > >> > > > > > > >> > > > > > Now that I'm thinking about it more, I'm not sure this would > be > >> a > >> > > great > >> > > > > > facility to depend on in practice. It might be OK if just one > or > >> > two > >> > > of > >> > > > > the > >> > > > > > consumers fall out of the group during the rebalance, but if > >> half > >> > the > >> > > > > group > >> > > > > > is regularly getting evicted, it would be a problem. So even > if > >> we > >> > > > expose > >> > > > > > the rebalance timeout, the user is still going to have to set > it > >> > with > >> > > > > some > >> > > > > > idea in mind about how long processing should take. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Jason > >> > > > > > > >> > > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang < > >> wangg...@gmail.com> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Hi Jason, > >> > > > > > > > >> > > > > > > With the current usage pattern of: > >> > > > > > > > >> > > > > > > while(..) { > >> > > > > > > > >> > > > > > > consumer.poll(/* where rebalance happens */) > >> > > > > > > > >> > > > > > > // process messages > >> > > > > > > } > >> > > > > > > > >> > > > > > > ---------- > >> > > > > > > > >> > > > > > > And since rebalance is till on the caller thread, not the > >> > > background > >> > > > > > > thread, if coordinator decides to rebalance while user > thread > >> is > >> > > > still > >> > > > > on > >> > > > > > > processing messages, there is no options but we are forced > to > >> go > >> > > with > >> > > > > 1) > >> > > > > > > right? I think the your / Onur's point here, which I agree, > is > >> > that > >> > > > by > >> > > > > > > reusing process timeout as rebalance timeout, if the > rebalance > >> > > > callback > >> > > > > > > could take longer time than processing a batch, users need > to > >> set > >> > > the > >> > > > > > > timeout value to the higher of the two, hence the callback > >> > latency, > >> > > > > which > >> > > > > > > will make detection of processing stallness less effective, > >> > right? > >> > > > > > > > >> > > > > > > As I mentioned in my previous email, I feel that this case > of > >> > > > > "callback > >> > > > > > > function time taking loner than processing a batch" would > not > >> be > >> > > > > frequent > >> > > > > > > in practice, and the processing timeout would usually be a > >> good > >> > > > higher > >> > > > > > > bound on the callback function latency. If that is true, I'd > >> > > suggest > >> > > > we > >> > > > > > > keep the current proposal and not add a third timeout config > >> for > >> > > > > covering > >> > > > > > > this case. > >> > > > > > > > >> > > > > > > > >> > > > > > > Guozhang > >> > > > > > > > >> > > > > > > > >> > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson < > >> > > ja...@confluent.io > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > > > Hey Guozhang, > >> > > > > > > > > >> > > > > > > > I think the problem is that users may not want to > sacrifice > >> > > > rebalance > >> > > > > > > > latency because of uncertainty around processing time. As > >> soon > >> > > as a > >> > > > > > > > rebalance begins, there are basically two choices: > >> > > > > > > > > >> > > > > > > > 1. Block the rebalance until all consumers have finished > >> their > >> > > > > current > >> > > > > > > > processing. > >> > > > > > > > 2. Let all consumers rebalance and "rollback" any > processing > >> > that > >> > > > > could > >> > > > > > > not > >> > > > > > > > be committed before the rebalance completes. > >> > > > > > > > > >> > > > > > > > If you choose option (1), then you have an incentive to > >> keep a > >> > > > > > relatively > >> > > > > > > > tight bound on process.timeout.ms in order to reduce the > >> > > > worst-case > >> > > > > > idle > >> > > > > > > > time during a rebalance. But if you fail to set it high > >> enough, > >> > > > then > >> > > > > > > you'll > >> > > > > > > > get spurious rebalances during normal processing. I think > >> Onur > >> > is > >> > > > > > saying > >> > > > > > > > that this still sort of sucks for users. On the other > hand, > >> if > >> > > (2) > >> > > > is > >> > > > > > > > acceptable, then users will have more freedom to err on > the > >> > high > >> > > > side > >> > > > > > > when > >> > > > > > > > setting process.timeout.ms, or even disable it entirely. > >> They > >> > > will > >> > > > > > have > >> > > > > > > to > >> > > > > > > > deal with rolling back any progress which cannot be > >> committed > >> > > after > >> > > > > the > >> > > > > > > > rebalance completes, but maybe this is less of a problem > for > >> > some > >> > > > > > users? > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Jason > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang < > >> > > wangg...@gmail.com > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi Onur, Jason: > >> > > > > > > > > > >> > > > > > > > > Here are some thoughts about reusing process timeout as > >> > > > server-side > >> > > > > > > > > rebalance timeout: First of all, my understanding is > that > >> > > > > > > > > > >> > > > > > > > > 1) session timeout is for detecting consumer crash / > hard > >> > > > failures > >> > > > > > (in > >> > > > > > > > this > >> > > > > > > > > case the heartbeat thread will be dead as well, hence > >> > > coordinator > >> > > > > > > > realized > >> > > > > > > > > within session timeout value). > >> > > > > > > > > > >> > > > > > > > > 2) process timeout is for checking liveness of the user > >> > thread > >> > > > that > >> > > > > > > calls > >> > > > > > > > > the consumer as well as does the processing: when no > >> consumer > >> > > > calls > >> > > > > > are > >> > > > > > > > > made within the process timeout, heartbeat thread stop > >> > working > >> > > > and > >> > > > > > > hence > >> > > > > > > > it > >> > > > > > > > > will be detected by coordinator. > >> > > > > > > > > > >> > > > > > > > > 3) a potential server-side rebalance timeout would be > >> used to > >> > > > > detect > >> > > > > > > > > consumer liveness during the rebalance period, in which > >> the > >> > > user > >> > > > > > thread > >> > > > > > > > is > >> > > > > > > > > tied with the "poll" call and also the callback > function, > >> to > >> > > > > prevent > >> > > > > > a > >> > > > > > > > slow > >> > > > > > > > > / stalled consumer in their rebalance callback to cause > >> the > >> > > > > rebalance > >> > > > > > > > > taking forever. > >> > > > > > > > > > >> > > > > > > > > I think we generally have two cases in practice > regarding > >> 3) > >> > > > above: > >> > > > > > > user > >> > > > > > > > > either does almost nothing and hence should never be > >> stalled > >> > > > > (unless > >> > > > > > > > there > >> > > > > > > > > is a long GC), or they do various external IOs for > >> > maintaining > >> > > > > their > >> > > > > > > own > >> > > > > > > > > states, for example, which could be taking long or even > >> cause > >> > > the > >> > > > > > > thread > >> > > > > > > > to > >> > > > > > > > > stall. We do not need to worry too much about the former > >> > case, > >> > > > and > >> > > > > as > >> > > > > > > for > >> > > > > > > > > latter case if the process timeout value should usually > >> be a > >> > > good > >> > > > > > > higher > >> > > > > > > > > bound on the rebalance latency. > >> > > > > > > > > > >> > > > > > > > > That being said, if we observe that there is indeed a > >> common > >> > > > usage > >> > > > > > > where > >> > > > > > > > 2) > >> > > > > > > > > and 3) would require very different timeout values which > >> > > > overwhelms > >> > > > > > the > >> > > > > > > > > complexity of three timeout values, we can consider > >> adding a > >> > > > third > >> > > > > > one > >> > > > > > > > > then: it is easier to add more configs later. > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > What do you think? > >> > > > > > > > > > >> > > > > > > > > Guozhang > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson < > >> > > > > ja...@confluent.io > >> > > > > > > > >> > > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hey Onur, > >> > > > > > > > > > > >> > > > > > > > > > Thanks for the detailed response. I think the problem > of > >> > > > > > controlling > >> > > > > > > > > > rebalance times is the main (known) gap in the > proposal > >> as > >> > it > >> > > > > > stands. > >> > > > > > > > > > > >> > > > > > > > > > This burden goes away if you loosen the liveness > >> property > >> > by > >> > > > > > having a > >> > > > > > > > > > > required rebalance time and optional processing time > >> > where > >> > > > > > > rebalance > >> > > > > > > > > > > happens in the background thread as stated in the > KIP. > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Just to clarify, the current KIP only allows > rebalances > >> to > >> > > > > complete > >> > > > > > > in > >> > > > > > > > > the > >> > > > > > > > > > foreground. When I suggested above in reply to Grant > was > >> > that > >> > > > we > >> > > > > > > could > >> > > > > > > > > add > >> > > > > > > > > > a separate rebalance timeout setting, the behavior I > >> had in > >> > > > mind > >> > > > > > was > >> > > > > > > to > >> > > > > > > > > let > >> > > > > > > > > > the consumer fall out of the group if the timeout is > >> > reached > >> > > > > while > >> > > > > > > the > >> > > > > > > > > > consumer is still processing. I was specifically > trying > >> to > >> > > > avoid > >> > > > > > > moving > >> > > > > > > > > the > >> > > > > > > > > > rebalance to the background thread since this > >> significantly > >> > > > > > increases > >> > > > > > > > the > >> > > > > > > > > > complexity of the implementation. We'd also have to > >> think > >> > > about > >> > > > > > > > > > compatibility a bit more. For example, what are the > >> > > > implications > >> > > > > of > >> > > > > > > > > having > >> > > > > > > > > > the rebalance listener execute in a separate thread? > >> > > > > > > > > > > >> > > > > > > > > > Putting that issue aside, I think we need to convince > >> > > ourselves > >> > > > > > that > >> > > > > > > a > >> > > > > > > > > > separate rebalance timeout is really necessary since > >> every > >> > > new > >> > > > > > > timeout > >> > > > > > > > > adds > >> > > > > > > > > > some conceptual noise which all users will see. My > >> thought > >> > in > >> > > > > this > >> > > > > > > KIP > >> > > > > > > > > was > >> > > > > > > > > > that users who didn't want the burden of tuning the > >> process > >> > > > > timeout > >> > > > > > > > could > >> > > > > > > > > > use a relatively large value without a major impact > >> because > >> > > > group > >> > > > > > > > > > rebalances themselves will typically be infrequent. > The > >> > main > >> > > > > > concern > >> > > > > > > is > >> > > > > > > > > for > >> > > > > > > > > > users who have highly variant processing times and > want > >> to > >> > > > > ensure a > >> > > > > > > > tight > >> > > > > > > > > > bound on rebalance times (even if it means having to > >> > discard > >> > > > some > >> > > > > > > > > > processing that cannot be completed before the > rebalance > >> > > > > finishes). > >> > > > > > > > These > >> > > > > > > > > > users will be left trying to tune process.timeout.ms > >> and > >> > > > > > > > > max.poll.records, > >> > > > > > > > > > which is basically the same position they are > currently > >> in. > >> > > The > >> > > > > > > problem > >> > > > > > > > > is > >> > > > > > > > > > I don't know how common this case is, so I'm not sure > >> how > >> > it > >> > > > > weighs > >> > > > > > > > > against > >> > > > > > > > > > the cost of having an additional timeout that needs to > >> be > >> > > > > > explained. > >> > > > > > > We > >> > > > > > > > > can > >> > > > > > > > > > always add the rebalance timeout later, but if it will > >> be > >> > > tough > >> > > > > to > >> > > > > > > > remove > >> > > > > > > > > > once it's there. All the same, I'm not that keen on > >> another > >> > > > > > iteration > >> > > > > > > > of > >> > > > > > > > > > this problem, so if we believe this use case is common > >> > > enough, > >> > > > > then > >> > > > > > > > maybe > >> > > > > > > > > > we should add it now. > >> > > > > > > > > > > >> > > > > > > > > > Thanks, > >> > > > > > > > > > Jason > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman < > >> > > > > > > > > > onurkaraman.apa...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Thanks for the KIP writeup, Jason. > >> > > > > > > > > > > > >> > > > > > > > > > > Before anything else, I just wanted to point out > that > >> > it's > >> > > > > worth > >> > > > > > > > > > mentioning > >> > > > > > > > > > > the "heartbeat.interval.ms" consumer config in the > >> KIP > >> > for > >> > > > > > > > > completeness. > >> > > > > > > > > > > Today this config only starts to kick in if poll is > >> > called > >> > > > > > > frequently > >> > > > > > > > > > > enough. A separate heartbeat thread should make this > >> > config > >> > > > > > behave > >> > > > > > > > more > >> > > > > > > > > > > like what people would expect: a separate thread > >> sending > >> > > > > > heartbeats > >> > > > > > > > at > >> > > > > > > > > > the > >> > > > > > > > > > > configured interval. > >> > > > > > > > > > > > >> > > > > > > > > > > With this KIP, the relevant configs become: > >> > > > > > > > > > > "max.poll.records" - already exists > >> > > > > > > > > > > "session.timeout.ms" - already exists > >> > > > > > > > > > > "heartbeat.interval.ms" - already exists > >> > > > > > > > > > > "process.timeout.ms" - new > >> > > > > > > > > > > > >> > > > > > > > > > > After reading the KIP several times, I think it > would > >> be > >> > > > > helpful > >> > > > > > to > >> > > > > > > > be > >> > > > > > > > > > more > >> > > > > > > > > > > explicit in the desired outcome. Is it trying to > make > >> > > faster > >> > > > > > > > > > > best/average/worst case rebalance times? Is it > trying > >> to > >> > > make > >> > > > > the > >> > > > > > > > > clients > >> > > > > > > > > > > need less configuration tuning? > >> > > > > > > > > > > > >> > > > > > > > > > > Also it seems that brokers probably still want to > >> enforce > >> > > > > minimum > >> > > > > > > and > >> > > > > > > > > > > maximum rebalance timeouts just as with the minimum > >> and > >> > > > maximum > >> > > > > > > > session > >> > > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory > >> > > > indefinitely. > >> > > > > So > >> > > > > > > > we'd > >> > > > > > > > > > add > >> > > > > > > > > > > new "group.min.rebalance.timeout.ms" and " > >> > > > > > > > > group.max.rebalance.timeout.ms > >> > > > > > > > > > " > >> > > > > > > > > > > broker configs which again might need to be brought > >> up in > >> > > the > >> > > > > > KIP. > >> > > > > > > > > Let's > >> > > > > > > > > > > say we add these bounds. A side-effect of having > >> > > broker-side > >> > > > > > bounds > >> > > > > > > > on > >> > > > > > > > > > > rebalance timeouts in combination with Java clients > >> that > >> > > > makes > >> > > > > > > > process > >> > > > > > > > > > > timeouts the same as rebalance timeouts is that the > >> > broker > >> > > > > > > > effectively > >> > > > > > > > > > > dictates the max processing time allowed between > poll > >> > > calls. > >> > > > > This > >> > > > > > > > > gotcha > >> > > > > > > > > > > exists right now with today's broker-side bounds on > >> > session > >> > > > > > > timeouts. > >> > > > > > > > > So > >> > > > > > > > > > > I'm not really convinced that the proposal gets rid > of > >> > this > >> > > > > > > > > complication > >> > > > > > > > > > > mentioned in the KIP. > >> > > > > > > > > > > > >> > > > > > > > > > > I think the main question to ask is: does the KIP > >> > actually > >> > > > > make a > >> > > > > > > > > > > difference? > >> > > > > > > > > > > > >> > > > > > > > > > > It looks like this KIP improves rebalance times > >> > > specifically > >> > > > > when > >> > > > > > > the > >> > > > > > > > > > > client currently has processing times large enough > to > >> > force > >> > > > > > larger > >> > > > > > > > > > session > >> > > > > > > > > > > timeouts and heartbeat intervals to not be honored. > >> > > > Separating > >> > > > > > > > session > >> > > > > > > > > > > timeouts from processing time means clients can keep > >> > their > >> > > " > >> > > > > > > > > > > session.timeout.ms" low so the coordinator can > >> quickly > >> > > > detect > >> > > > > > > > process > >> > > > > > > > > > > failure, and honoring a low "heartbeat.interval.ms" > >> on > >> > the > >> > > > > > > separate > >> > > > > > > > > > > heartbeat thread means clients will be quickly > >> notified > >> > of > >> > > > > group > >> > > > > > > > > > membership > >> > > > > > > > > > > and subscription changes - all without placing > >> difficult > >> > > > > > > expectations > >> > > > > > > > > on > >> > > > > > > > > > > processing time. But even so, rebalancing through > the > >> > > calling > >> > > > > > > thread > >> > > > > > > > > > means > >> > > > > > > > > > > the slowest processing client in the group will > still > >> be > >> > > the > >> > > > > rate > >> > > > > > > > > > limiting > >> > > > > > > > > > > step when looking at rebalance times. > >> > > > > > > > > > > > >> > > > > > > > > > > From a usability perspective, the burden still seems > >> like > >> > > it > >> > > > > will > >> > > > > > > be > >> > > > > > > > > > tuning > >> > > > > > > > > > > the processing time to keep the "progress liveness" > >> happy > >> > > > > during > >> > > > > > > > > > rebalances > >> > > > > > > > > > > while still having reasonable upper bounds on > >> rebalance > >> > > > times. > >> > > > > It > >> > > > > > > > still > >> > > > > > > > > > > looks like users have to do almost the exact same > >> tricks > >> > as > >> > > > > today > >> > > > > > > > when > >> > > > > > > > > > the > >> > > > > > > > > > > group membership changes due slow processing times > >> even > >> > > > though > >> > > > > > all > >> > > > > > > > the > >> > > > > > > > > > > consumers are alive and the topics haven't change: > >> > > > > > > > > > > 1. Increase the rebalance timeout to give more time > >> for > >> > > > record > >> > > > > > > > > processing > >> > > > > > > > > > > (the difference compared to today is that we bump > the > >> > > > rebalance > >> > > > > > > > timeout > >> > > > > > > > > > > instead of session timeout). > >> > > > > > > > > > > 2. Reduce the number of records handled on each > >> iteration > >> > > > with > >> > > > > > > > > > > max.poll.records. > >> > > > > > > > > > > > >> > > > > > > > > > > This burden goes away if you loosen the liveness > >> property > >> > > by > >> > > > > > > having a > >> > > > > > > > > > > required rebalance time and optional processing time > >> > where > >> > > > > > > rebalance > >> > > > > > > > > > > happens in the background thread as stated in the > KIP. > >> > > > > > > > > > > > >> > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson < > >> > > > > > > > ja...@confluent.io> > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Hey Grant, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks for the feedback. I'm definitely open to > >> > including > >> > > > > > > > heartbeat() > >> > > > > > > > > > in > >> > > > > > > > > > > > this KIP. One thing we should be clear about is > what > >> > the > >> > > > > > behavior > >> > > > > > > > of > >> > > > > > > > > > > > heartbeat() should be when the group begins > >> > rebalancing. > >> > > I > >> > > > > > think > >> > > > > > > > > there > >> > > > > > > > > > > are > >> > > > > > > > > > > > basically two options: > >> > > > > > > > > > > > > >> > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if > the > >> > > group > >> > > > > has > >> > > > > > > > > started > >> > > > > > > > > > > > rebalancing. > >> > > > > > > > > > > > 2. heartbeat() completes the rebalance itself. > >> > > > > > > > > > > > > >> > > > > > > > > > > > With the first option, when processing takes > longer > >> > than > >> > > > the > >> > > > > > > > > rebalance > >> > > > > > > > > > > > timeout, the member will fall out of the group > which > >> > will > >> > > > > cause > >> > > > > > > an > >> > > > > > > > > > offset > >> > > > > > > > > > > > commit failure when it finally finishes. However, > if > >> > > > > processing > >> > > > > > > > > > finishes > >> > > > > > > > > > > > before the rebalance completes, then offsets can > >> still > >> > be > >> > > > > > > > committed. > >> > > > > > > > > On > >> > > > > > > > > > > the > >> > > > > > > > > > > > other hand, if heartbeat() completes the rebalance > >> > > itself, > >> > > > > then > >> > > > > > > > > you'll > >> > > > > > > > > > > > definitely see the offset commit failure for any > >> > records > >> > > > > being > >> > > > > > > > > > processed. > >> > > > > > > > > > > > So the first option is sort of biased toward > >> processing > >> > > > > > > completion > >> > > > > > > > > > while > >> > > > > > > > > > > > the latter is biased toward rebalance completion. > >> > > > > > > > > > > > > >> > > > > > > > > > > > I'm definitely not a fan of second option since it > >> > takes > >> > > > away > >> > > > > > the > >> > > > > > > > > > choice > >> > > > > > > > > > > to > >> > > > > > > > > > > > finish processing before rejoining. However, I do > >> see > >> > > some > >> > > > > > > benefit > >> > > > > > > > in > >> > > > > > > > > > the > >> > > > > > > > > > > > first option if the user wants to keep rebalance > >> time > >> > low > >> > > > and > >> > > > > > > > doesn't > >> > > > > > > > > > > mind > >> > > > > > > > > > > > being kicked out of the group if processing takes > >> > longer > >> > > > > > during a > >> > > > > > > > > > > > rebalance. This may be a reasonable tradeoff since > >> > > consumer > >> > > > > > > groups > >> > > > > > > > > are > >> > > > > > > > > > > > presumed to be stable most of the time. A better > >> option > >> > > in > >> > > > > that > >> > > > > > > > case > >> > > > > > > > > > > might > >> > > > > > > > > > > > be to expose the rebalance timeout to the user > >> directly > >> > > > since > >> > > > > > it > >> > > > > > > > > would > >> > > > > > > > > > > > allow the user to use an essentially unbounded > >> > > > > > > process.timeout.ms > >> > > > > > > > > for > >> > > > > > > > > > > > highly variant processing while still keeping > >> rebalance > >> > > > time > >> > > > > > > > limited. > >> > > > > > > > > > Of > >> > > > > > > > > > > > course, it would be another timeout for the user > to > >> > > > > > understand... > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > Jason > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke < > >> > > > > > > ghe...@cloudera.com> > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi Jason, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for writing up a proposal (and a thorough > >> > one)! > >> > > > This > >> > > > > > is > >> > > > > > > > > > > something > >> > > > > > > > > > > > > that I had been thinking about this week too as > I > >> > have > >> > > > run > >> > > > > > into > >> > > > > > > > it > >> > > > > > > > > > more > >> > > > > > > > > > > > > than a handful of times now. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > I like the idea of having a larger processing > >> > timeout, > >> > > > that > >> > > > > > > > timeout > >> > > > > > > > > > in > >> > > > > > > > > > > > > unison with max.poll.records should in many > cases > >> > > > provide a > >> > > > > > > > > > reasonable > >> > > > > > > > > > > > > assurance that the consumer will stay alive. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > In rejected alternatives "Add a separate API the > >> user > >> > > can > >> > > > > > call > >> > > > > > > to > >> > > > > > > > > > > > indicate > >> > > > > > > > > > > > > liveness" is listed. I think a heartbeat api > >> could be > >> > > > added > >> > > > > > > along > >> > > > > > > > > > with > >> > > > > > > > > > > > > these new timeout configurations and used for > >> > > "advanced" > >> > > > > use > >> > > > > > > > cases > >> > > > > > > > > > > where > >> > > > > > > > > > > > > the processing time could be highly variant and > >> less > >> > > > > > > > predictable. I > >> > > > > > > > > > > > think a > >> > > > > > > > > > > > > place where we might use the heartbeat api in > >> Kafka > >> > is > >> > > > > > > > MirrorMaker. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Today, I have seen people trying to find ways to > >> > > leverage > >> > > > > the > >> > > > > > > > > > existing > >> > > > > > > > > > > > api > >> > > > > > > > > > > > > to "force" heartbeats by: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > 1. Calling poll to get the batch of records to > >> > process > >> > > > > > > > > > > > > 2. Call pause on all partitions > >> > > > > > > > > > > > > 3. Process the record batch > >> > > > > > > > > > > > > 3a. While processing periodically call poll > >> (which is > >> > > > > > > essentially > >> > > > > > > > > > just > >> > > > > > > > > > > > > heartbeat since it returns no records and is > >> paused) > >> > > > > > > > > > > > > 4. Commit offsets and un-pause > >> > > > > > > > > > > > > 5. Repeat from 1 > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > Grant > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason > Gustafson < > >> > > > > > > > > ja...@confluent.io > >> > > > > > > > > > > > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi All, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > One of the persistent problems we see with the > >> new > >> > > > > consumer > >> > > > > > > is > >> > > > > > > > > the > >> > > > > > > > > > > use > >> > > > > > > > > > > > of > >> > > > > > > > > > > > > > the session timeout in order to ensure > progress. > >> > > > Whenever > >> > > > > > > there > >> > > > > > > > > is > >> > > > > > > > > > a > >> > > > > > > > > > > > > delay > >> > > > > > > > > > > > > > in message processing which exceeds the > session > >> > > > timeout, > >> > > > > no > >> > > > > > > > > > > heartbeats > >> > > > > > > > > > > > > can > >> > > > > > > > > > > > > > be sent and the consumer is removed from the > >> group. > >> > > We > >> > > > > seem > >> > > > > > > to > >> > > > > > > > > hit > >> > > > > > > > > > > this > >> > > > > > > > > > > > > > problem everywhere the consumer is used > >> (including > >> > > > Kafka > >> > > > > > > > Connect > >> > > > > > > > > > and > >> > > > > > > > > > > > > Kafka > >> > > > > > > > > > > > > > Streams) and we don't always have a great > >> solution. > >> > > > I've > >> > > > > > > > written > >> > > > > > > > > a > >> > > > > > > > > > > KIP > >> > > > > > > > > > > > to > >> > > > > > > > > > > > > > address this problem here: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > >> > > > > > > > > > > > > > . > >> > > > > > > > > > > > > > Have a look and let me know what you think. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > Jason > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > -- > >> > > > > > > > > > > > > Grant Henke > >> > > > > > > > > > > > > Software Engineer | Cloudera > >> > > > > > > > > > > > > gr...@cloudera.com | twitter.com/gchenke | > >> > > > > > > > > > linkedin.com/in/granthenke > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > -- > >> > > > > > > > > -- Guozhang > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > -- Guozhang > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > Thanks, > >> > > > > Ewen > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > -- Guozhang > >> > > > > >> > > > >> > > >> > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang >