Guozhang and Jason,

I think we are on the same page that having rebalances done in the
background thread has a much bigger impact to the users. So I agree that is
is probably better to start with having 1) and 2). We can add 3) later if
necessary.

Another implementation detail I am not quite sure is about making the
NetworkClient work with two threads. The KIP implies that this will be done
by synchronizing on ConsumerNetworkClient. I am not sure if that is enough,
what if a poll() from ConsumerNetworkClient receives a FetchResponse or
OffsetFetchResponse which are supposed to be handled by user thread? This
is implementation detail but may be worth thinking about a bit more.

Thanks,

Jiangjie (Becket) Qin


On Mon, Jun 6, 2016 at 11:27 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Jiangjie:
>
> About doing the rebalance in the background thread, I'm a bit concerned as
> it will change a lot of the concurrency guarantees that consumer currently
> provides (think of a consumer caller thread committing externally while the
> rebalance is happening in the background thread), and hence if we are
> considering changing that now or in the future, we need to think through
> all the corner cases.
>
> So in general, I'd still prefer we reserve a third config for rebalance
> timeout in this KIP.
>
> Guozhang
>
>
> On Mon, Jun 6, 2016 at 11:25 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > (+ Matthias)
> >
> > Hello Henry,
> >
> > Specifically to your question regarding Kafka Streams:
> >
> > 1. Currently restoreActiveState() is triggered in the onPartitionAssigned
> > callback, which is after the rebalance is completed from the
> coordinator's
> > point of view, and hence is covered in the process timeout value in this
> > new KIP.
> >
> > 2. That is a good question, and I think it is a general root cause we saw
> > failures of directory locking reported by more than one use case already.
> > Currently I believe the main reason that a second rebalance is triggered
> > while the processors are still completing restoreActiveState() of the
> > previous rebalance is due to session timeout (default 30 seconds), which
> > will be largely reduced with a larger processor timeout; however with
> > complex topologies we restoreActiveState() for all states may still be
> > taking long time with tens / hundreds of state stores, and other cases
> > that also can cause consumers to re-join the groups right after a
> previous
> > rebalance, for example 1) regex subscription where the topic metadata has
> > changed, 2) consecutive consumer failures, or new consumers (i.e. new
> > KStream instances / threads) added.
> >
> > For such cases we can do a better job to "fail fast" if the consumer
> > detects another join is needed. I think in one of your local commit you
> > are already doing sth similar, which we can merge back to trunk.
> >
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 5, 2016 at 11:24 PM, Henry Cai <h...@pinterest.com.invalid>
> > wrote:
> >
> >> I have a question on the KIP on long stall during
> >> ProcessorStateManager.restoreActiveState(), this can be a long stall
> when
> >> we need to rebuild the RocksDB state on a new node.
> >>
> >> 1. Is restoreActiveState() considered as post rebalance since this is
> >> invoked on application rebalance listener?
> >> 2. When the node A was spending long time rebuilding the state in
> >> restoreActiveState() from the previous rebalance, a new node (node B)
> send
> >> a new JoinGroup request to the co-ordinator, how long should the
> >> coordinator wait for node A to finish the restoreActiveState from the
> >> previous rebalance, the restoreActiveState can take more than 10 minutes
> >> for a big state.
> >>
> >>
> >> On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >>
> >> > Hi Jason,
> >> >
> >> > Thanks for this very useful KIP.  In general I am with Guozhang on the
> >> > purpose of of the three timeouts.
> >> > 1) session timeout for consumer liveness,
> >> > 2) process timeout (or maybe we should rename it to
> >> max.poll.interval.ms)
> >> > for application liveness,
> >> > 3) rebalance timeout for faster rebalance in some failure cases.
> >> >
> >> > It seems the current discussion is mainly about whether we need 3) as
> a
> >> > separate timeout or not. The current KIP proposal is to combine 2) and
> >> 3),
> >> > i.e. just use process timeout as rebalance timeout. That means we need
> >> to
> >> > either increase rebalance timeout out to let it adapt to process
> >> timeout,
> >> > or the reverse. It would be helpful to understand the impact of these
> >> two
> >> > cases. Here are my two cents.
> >> >
> >> > For users who are consuming data from Kafka, usually they either care
> >> about
> >> > throughput or care about latency.
> >> >
> >> > If users care about the latency, they would probably care more about
> >> > average latency instead of 99.99 percentile latency which can be
> >> affected
> >> > by many other more common reasons other than consumer failure. Because
> >> all
> >> > the timeout we are discussing here only have impact on the 99.99
> >> percentile
> >> > latency, I don't think it would really make a difference for latency
> >> > sensitive users.
> >> >
> >> > The majority of the use cases for Kafka Connect and Mirror Maker are
> >> > throughput sensitive. Ewen raised a good example where Kafka Connect
> >> needs
> >> > to process the previous data on rebalance therefore requires a higher
> >> > rebalance timeout than process timeout. This is essentially the same
> in
> >> > Mirror Maker, where each rebalance needs to flush all the messages in
> >> the
> >> > accumulator in the producer. That could take some time depending on
> how
> >> > many messages are there. In this case, we may need to increase the
> >> process
> >> > timeout to make it the same as rebalance timeout. But this is probably
> >> > fine. The downside of increasing process timeout is a longer detection
> >> time
> >> > of a consumer failure.  Detecting a consumer failure a little later
> only
> >> > has limited impact because the rest of the consumers in the same group
> >> are
> >> > still working fine. So the total throughput is unlikely to drop
> >> > significantly. As long as the rebalance is not taking longer it should
> >> be
> >> > fine. The reason we care more about how fast rebalance can finish is
> >> > because during rebalance no consumer in the group is consuming, i.e.
> >> > throughput is zero. So we want to make the rebalance finish as quickly
> >> as
> >> > possible.
> >> >
> >> > Compare with increasing process timeout to rebalance timeout, it
> seems a
> >> > more common case where user wants a longer process timeout, but
> smaller
> >> > rebalance timeout. I am more worried about this case where we have to
> >> > shoehorn the rebalance timeout into process timeout. For users care
> >> about
> >> > throughput, that might cause the rebalance to take unnecessarily
> longer.
> >> > Admittedly this only has impact when a consumer had problem during
> >> > rebalance, but depending on how long the process timeout was set, the
> >> > rebalance could potentially take forever like Guozhang mentioned.
> >> >
> >> > I agree with Guozhang that we can start with 1) and 2) and add 3)
> later
> >> if
> >> > needed. But adding rebalance timeout is more involved than just
> adding a
> >> > configuration. That also means the rebalance has to be done in the
> >> > background heartbeat thread. Hence we have to synchronize rebalance
> and
> >> > consumer.poll() like we did in old consumer. Otherwise user may lose
> >> > messages if auto commit is enabled, or the manual commit might fail
> >> after a
> >> > consumer.poll() because the partitions might have been reassigned. So
> >> > having a separate rebalance timeout also potentially means a big
> change
> >> to
> >> > the users as well.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> >
> >> >
> >> > On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <ja...@confluent.io>
> >> > wrote:
> >> >
> >> > > Hey Ewen,
> >> > >
> >> > > I confess your comments caught me off guard. It never occurred to me
> >> that
> >> > > anyone would ask for a rebalance timeout so that it could be set
> >> _larger_
> >> > > than the process timeout. Even with buffered or batch processing, I
> >> would
> >> > > usually expect flushing before a rebalance to take no more time
> than a
> >> > > periodic flush. Otherwise, I'd probably try to see if there was some
> >> > > workload I could push into periodic flushes so that rebalances could
> >> > > complete faster. But supposing this isn't possible or practical in
> >> some
> >> > > cases, I'm wondering how limiting it would be in practice to have
> only
> >> > the
> >> > > one timeout in this case? I'm a little reluctant to add the
> additional
> >> > > timeout since I think most users would not have a strong need to
> keep
> >> a
> >> > > tight bound on normal processing time. (I'm also reminded that Jay
> >> > > mentioned he might have to dock everyone's pay 5% for each new
> >> timeout we
> >> > > introduce ;-)
> >> > >
> >> > > Thanks,
> >> > > Jason
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <wangg...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Hi Ewen,
> >> > > >
> >> > > > I think you are right, the rebalance process could potentially
> >> involve
> >> > > all
> >> > > > the delayed compute / IO. More specifically, this is what I think
> of
> >> > the
> >> > > > rebalance process:
> >> > > >
> >> > > > 1. Coordinator decides to rebalance, start ticking based on
> >> rebalance
> >> > > time
> >> > > > out.
> >> > > > 2. Consumer realize rebalance needed when calling poll(); trigger
> >> > > > onPartitionRevoked().
> >> > > > 3. Consumer sends JoinGroupRequest;
> >> > > > 4. Coordinator send JoinGroupResponse; start ticking on the
> leader.
> >> > > > 5. Leader compute and send SyncGroupRequest
> >> > > > 6. Coordinator send SyncGroupResponse; start ticking on session
> >> > timeout.
> >> > > > 7. Consumer get new assignment; trigger onPartitionAssigned().
> >> > > >
> >> > > > In the above process: delayed compute / IO is usually done at step
> >> 2);
> >> > > > workload initialization is usually done in step 7); and some admin
> >> work
> >> > > > (like in Kafka Streams) are likely to be done in step 5). As in
> the
> >> > > current
> >> > > > KIP proposal the rebalance timeout on the coordinator start
> ticking
> >> on
> >> > 1)
> >> > > > on everyone in the group, and stop ticking on 3); it start ticking
> >> on
> >> > > > leader again on step 4), and stop upon step 5). In this case the
> >> > delayed
> >> > > > compute / IO contained in step 2) is covered by this rebalance
> >> timeout.
> >> > > >
> >> > > > That being said, I think for "worst case", the time of processing
> a
> >> > > single
> >> > > > record would still be similar to rebalancing, since both of which
> >> could
> >> > > > result in completing all delayed compute / IO so far. And since
> >> > > "processing
> >> > > > timeout" is used to cover the worst case, it should be still OK?
> >> > > >
> >> > > >
> >> > > > Guozhang
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava <
> >> > e...@confluent.io
> >> > > >
> >> > > > wrote:
> >> > > >
> >> > > > > Jason,
> >> > > > >
> >> > > > > I've been thinking about this more in terms of something like
> >> > Connect.
> >> > > I
> >> > > > > think the rebalance timeout may be a bit different from the
> >> process
> >> > > > > timeout, and even the process timeout is a bit of a misnomer.
> >> > > > >
> >> > > > > We sort of talk about the process timeout as if it can be an
> >> > indicator
> >> > > of
> >> > > > > maximum processing time for a record/batch. This makes sense
> for a
> >> > case
> >> > > > of
> >> > > > > a data-dependent load (i.e. you can only load some data from
> slow
> >> > > storage
> >> > > > > after seeing some data) where that load might be very large
> >> compared
> >> > to
> >> > > > > normal processing time. It also makes sense if you have auto
> >> commit
> >> > > > enabled
> >> > > > > because you need to be completely finished processing the data
> >> before
> >> > > > > calling poll() again, so that time before you call another
> >> consumer
> >> > API
> >> > > > > actually reflects processing time.
> >> > > > >
> >> > > > > It might makes less sense in cases like streams (or any other
> app)
> >> > that
> >> > > > > batch writes to disk, or connectors that "process" a message by
> >> > > enqueuing
> >> > > > > the data, but won't commit offsets until data is flushed,
> possibly
> >> > > during
> >> > > > > some other, much later iteration of processing. In this case I
> >> think
> >> > > > > processing time and rebalance time could potentially differ
> >> > > > significantly.
> >> > > > > During normal processing, you can potentially pipeline quite a
> >> bit,
> >> > > > > buffering up changes, flushing as needed, but then only
> committing
> >> > once
> >> > > > > flushing is complete. But rebalancing is different then -- you
> >> *must*
> >> > > > > finish flushing all the data or manually choose to discard the
> >> data
> >> > > > > (presumably by doing something like watching for the process
> >> timeout
> >> > > you
> >> > > > > set and bailing early, only committing the offsets for data
> you've
> >> > > > > flushed). If you have lots of data built up, the cost for
> >> rebalancing
> >> > > > could
> >> > > > > be a *lot* higher than the maximum time you would otherwise see
> >> > between
> >> > > > > calls to consumer APIs to indicate processing progress.
> >> > > > >
> >> > > > > The thing that makes these cases different is that processing
> >> isn't
> >> > > > > actually tied to calls to the consumer API. You can queue up /
> >> > > pipeline /
> >> > > > > defer some of the work. (By the way, this is currently a
> >> limitation
> >> > of
> >> > > > sink
> >> > > > > connectors that I'm not thrilled about -- offset commit
> requires a
> >> > full
> >> > > > > flush, whereas some coordination with the sink connector to not
> >> > > require a
> >> > > > > full flush except on rebalances would be much nicer, albeit more
> >> > > > difficult
> >> > > > > for sink connectors to implement.)
> >> > > > >
> >> > > > > -Ewen
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <
> >> ja...@confluent.io>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hey Guozhang,
> >> > > > > >
> >> > > > > > I'm actually not too concerned about the time spent in the
> >> > rebalance
> >> > > > > > callback specifically. Both it and regular processing time in
> >> the
> >> > > poll
> >> > > > > loop
> >> > > > > > will delay the rebalance and keep joined consumers idle.
> >> However,
> >> > if
> >> > > we
> >> > > > > > expose the rebalance timeout, then it would give users the
> >> option
> >> > to
> >> > > > > > effective disable the process timeout while still keeping a
> >> maximum
> >> > > > bound
> >> > > > > > on the rebalance time. If the consumer cannot complete its
> >> > processing
> >> > > > > fast
> >> > > > > > enough and rejoin, then it would be evicted. This provides
> >> > something
> >> > > > like
> >> > > > > > (2) since the other consumers in the group would be able to
> >> > complete
> >> > > > the
> >> > > > > > rebalance and resume work while the evicted consumer would
> have
> >> to
> >> > > > > rollback
> >> > > > > > progress. This is not too different from rebalancing in the
> >> > > background
> >> > > > > > which also typically would cause commit failure and rollback
> >> > (though
> >> > > at
> >> > > > > > least the consumer stays in the group).
> >> > > > > >
> >> > > > > > Now that I'm thinking about it more, I'm not sure this would
> be
> >> a
> >> > > great
> >> > > > > > facility to depend on in practice. It might be OK if just one
> or
> >> > two
> >> > > of
> >> > > > > the
> >> > > > > > consumers fall out of the group during the rebalance, but if
> >> half
> >> > the
> >> > > > > group
> >> > > > > > is regularly getting evicted, it would be a problem. So even
> if
> >> we
> >> > > > expose
> >> > > > > > the rebalance timeout, the user is still going to have to set
> it
> >> > with
> >> > > > > some
> >> > > > > > idea in mind about how long processing should take.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Jason
> >> > > > > >
> >> > > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi Jason,
> >> > > > > > >
> >> > > > > > > With the current usage pattern of:
> >> > > > > > >
> >> > > > > > > while(..) {
> >> > > > > > >
> >> > > > > > >   consumer.poll(/* where rebalance happens */)
> >> > > > > > >
> >> > > > > > >   // process messages
> >> > > > > > > }
> >> > > > > > >
> >> > > > > > > ----------
> >> > > > > > >
> >> > > > > > > And since rebalance is till on the caller thread, not the
> >> > > background
> >> > > > > > > thread, if coordinator decides to rebalance while user
> thread
> >> is
> >> > > > still
> >> > > > > on
> >> > > > > > > processing messages, there is no options but we are forced
> to
> >> go
> >> > > with
> >> > > > > 1)
> >> > > > > > > right? I think the your / Onur's point here, which I agree,
> is
> >> > that
> >> > > > by
> >> > > > > > > reusing process timeout as rebalance timeout, if the
> rebalance
> >> > > > callback
> >> > > > > > > could take longer time than processing a batch, users need
> to
> >> set
> >> > > the
> >> > > > > > > timeout value to the higher of the two, hence the callback
> >> > latency,
> >> > > > > which
> >> > > > > > > will make detection of processing stallness less effective,
> >> > right?
> >> > > > > > >
> >> > > > > > > As I mentioned  in my previous email, I feel that this case
> of
> >> > > > > "callback
> >> > > > > > > function time taking loner than processing a batch" would
> not
> >> be
> >> > > > > frequent
> >> > > > > > > in practice, and the processing timeout would usually be a
> >> good
> >> > > > higher
> >> > > > > > > bound on the callback function latency. If that is true, I'd
> >> > > suggest
> >> > > > we
> >> > > > > > > keep the current proposal and not add a third timeout config
> >> for
> >> > > > > covering
> >> > > > > > > this case.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <
> >> > > ja...@confluent.io
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hey Guozhang,
> >> > > > > > > >
> >> > > > > > > > I think the problem is that users may not want to
> sacrifice
> >> > > > rebalance
> >> > > > > > > > latency because of uncertainty around processing time. As
> >> soon
> >> > > as a
> >> > > > > > > > rebalance begins, there are basically two choices:
> >> > > > > > > >
> >> > > > > > > > 1. Block the rebalance until all consumers have finished
> >> their
> >> > > > > current
> >> > > > > > > > processing.
> >> > > > > > > > 2. Let all consumers rebalance and "rollback" any
> processing
> >> > that
> >> > > > > could
> >> > > > > > > not
> >> > > > > > > > be committed before the rebalance completes.
> >> > > > > > > >
> >> > > > > > > > If you choose option (1), then you have an incentive to
> >> keep a
> >> > > > > > relatively
> >> > > > > > > > tight bound on process.timeout.ms in order to reduce the
> >> > > > worst-case
> >> > > > > > idle
> >> > > > > > > > time during a rebalance. But if you fail to set it high
> >> enough,
> >> > > > then
> >> > > > > > > you'll
> >> > > > > > > > get spurious rebalances during normal processing. I think
> >> Onur
> >> > is
> >> > > > > > saying
> >> > > > > > > > that this still sort of sucks for users. On the other
> hand,
> >> if
> >> > > (2)
> >> > > > is
> >> > > > > > > > acceptable, then users will have more freedom to err on
> the
> >> > high
> >> > > > side
> >> > > > > > > when
> >> > > > > > > > setting process.timeout.ms, or even disable it entirely.
> >> They
> >> > > will
> >> > > > > > have
> >> > > > > > > to
> >> > > > > > > > deal with rolling back any progress which cannot be
> >> committed
> >> > > after
> >> > > > > the
> >> > > > > > > > rebalance completes, but maybe this is less of a problem
> for
> >> > some
> >> > > > > > users?
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > Jason
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <
> >> > > wangg...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi Onur, Jason:
> >> > > > > > > > >
> >> > > > > > > > > Here are some thoughts about reusing process timeout as
> >> > > > server-side
> >> > > > > > > > > rebalance timeout: First of all, my understanding is
> that
> >> > > > > > > > >
> >> > > > > > > > > 1) session timeout is for detecting consumer crash /
> hard
> >> > > > failures
> >> > > > > > (in
> >> > > > > > > > this
> >> > > > > > > > > case the heartbeat thread will be dead as well, hence
> >> > > coordinator
> >> > > > > > > > realized
> >> > > > > > > > > within session timeout value).
> >> > > > > > > > >
> >> > > > > > > > > 2) process timeout is for checking liveness of the user
> >> > thread
> >> > > > that
> >> > > > > > > calls
> >> > > > > > > > > the consumer as well as does the processing: when no
> >> consumer
> >> > > > calls
> >> > > > > > are
> >> > > > > > > > > made within the process timeout, heartbeat thread stop
> >> > working
> >> > > > and
> >> > > > > > > hence
> >> > > > > > > > it
> >> > > > > > > > > will be detected by coordinator.
> >> > > > > > > > >
> >> > > > > > > > > 3) a potential server-side rebalance timeout would be
> >> used to
> >> > > > > detect
> >> > > > > > > > > consumer liveness during the rebalance period, in which
> >> the
> >> > > user
> >> > > > > > thread
> >> > > > > > > > is
> >> > > > > > > > > tied with the "poll" call and also the callback
> function,
> >> to
> >> > > > > prevent
> >> > > > > > a
> >> > > > > > > > slow
> >> > > > > > > > > / stalled consumer in their rebalance callback to cause
> >> the
> >> > > > > rebalance
> >> > > > > > > > > taking forever.
> >> > > > > > > > >
> >> > > > > > > > > I think we generally have two cases in practice
> regarding
> >> 3)
> >> > > > above:
> >> > > > > > > user
> >> > > > > > > > > either does almost nothing and hence should never be
> >> stalled
> >> > > > > (unless
> >> > > > > > > > there
> >> > > > > > > > > is a long GC), or they do various external IOs for
> >> > maintaining
> >> > > > > their
> >> > > > > > > own
> >> > > > > > > > > states, for example, which could be taking long or even
> >> cause
> >> > > the
> >> > > > > > > thread
> >> > > > > > > > to
> >> > > > > > > > > stall. We do not need to worry too much about the former
> >> > case,
> >> > > > and
> >> > > > > as
> >> > > > > > > for
> >> > > > > > > > > latter case if the process timeout value should usually
> >> be a
> >> > > good
> >> > > > > > > higher
> >> > > > > > > > > bound on the rebalance latency.
> >> > > > > > > > >
> >> > > > > > > > > That being said, if we observe that there is indeed a
> >> common
> >> > > > usage
> >> > > > > > > where
> >> > > > > > > > 2)
> >> > > > > > > > > and 3) would require very different timeout values which
> >> > > > overwhelms
> >> > > > > > the
> >> > > > > > > > > complexity of three timeout values, we can consider
> >> adding a
> >> > > > third
> >> > > > > > one
> >> > > > > > > > > then: it is easier to add more configs later.
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > What do you think?
> >> > > > > > > > >
> >> > > > > > > > > Guozhang
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <
> >> > > > > ja...@confluent.io
> >> > > > > > >
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hey Onur,
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks for the detailed response. I think the problem
> of
> >> > > > > > controlling
> >> > > > > > > > > > rebalance times is the main (known) gap in the
> proposal
> >> as
> >> > it
> >> > > > > > stands.
> >> > > > > > > > > >
> >> > > > > > > > > > This burden goes away if you loosen the liveness
> >> property
> >> > by
> >> > > > > > having a
> >> > > > > > > > > > > required rebalance time and optional processing time
> >> > where
> >> > > > > > > rebalance
> >> > > > > > > > > > > happens in the background thread as stated in the
> KIP.
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Just to clarify, the current KIP only allows
> rebalances
> >> to
> >> > > > > complete
> >> > > > > > > in
> >> > > > > > > > > the
> >> > > > > > > > > > foreground. When I suggested above in reply to Grant
> was
> >> > that
> >> > > > we
> >> > > > > > > could
> >> > > > > > > > > add
> >> > > > > > > > > > a separate rebalance timeout setting, the behavior I
> >> had in
> >> > > > mind
> >> > > > > > was
> >> > > > > > > to
> >> > > > > > > > > let
> >> > > > > > > > > > the consumer fall out of the group if the timeout is
> >> > reached
> >> > > > > while
> >> > > > > > > the
> >> > > > > > > > > > consumer is still processing. I was specifically
> trying
> >> to
> >> > > > avoid
> >> > > > > > > moving
> >> > > > > > > > > the
> >> > > > > > > > > > rebalance to the background thread since this
> >> significantly
> >> > > > > > increases
> >> > > > > > > > the
> >> > > > > > > > > > complexity of the implementation. We'd also have to
> >> think
> >> > > about
> >> > > > > > > > > > compatibility a bit more. For example, what are the
> >> > > > implications
> >> > > > > of
> >> > > > > > > > > having
> >> > > > > > > > > > the rebalance listener execute in a separate thread?
> >> > > > > > > > > >
> >> > > > > > > > > > Putting that issue aside, I think we need to convince
> >> > > ourselves
> >> > > > > > that
> >> > > > > > > a
> >> > > > > > > > > > separate rebalance timeout is really necessary since
> >> every
> >> > > new
> >> > > > > > > timeout
> >> > > > > > > > > adds
> >> > > > > > > > > > some conceptual noise which all users will see. My
> >> thought
> >> > in
> >> > > > > this
> >> > > > > > > KIP
> >> > > > > > > > > was
> >> > > > > > > > > > that users who didn't want the burden of tuning the
> >> process
> >> > > > > timeout
> >> > > > > > > > could
> >> > > > > > > > > > use a relatively large value without a major impact
> >> because
> >> > > > group
> >> > > > > > > > > > rebalances themselves will typically be infrequent.
> The
> >> > main
> >> > > > > > concern
> >> > > > > > > is
> >> > > > > > > > > for
> >> > > > > > > > > > users who have highly variant processing times and
> want
> >> to
> >> > > > > ensure a
> >> > > > > > > > tight
> >> > > > > > > > > > bound on rebalance times (even if it means having to
> >> > discard
> >> > > > some
> >> > > > > > > > > > processing that cannot be completed before the
> rebalance
> >> > > > > finishes).
> >> > > > > > > > These
> >> > > > > > > > > > users will be left trying to tune process.timeout.ms
> >> and
> >> > > > > > > > > max.poll.records,
> >> > > > > > > > > > which is basically the same position they are
> currently
> >> in.
> >> > > The
> >> > > > > > > problem
> >> > > > > > > > > is
> >> > > > > > > > > > I don't know how common this case is, so I'm not sure
> >> how
> >> > it
> >> > > > > weighs
> >> > > > > > > > > against
> >> > > > > > > > > > the cost of having an additional timeout that needs to
> >> be
> >> > > > > > explained.
> >> > > > > > > We
> >> > > > > > > > > can
> >> > > > > > > > > > always add the rebalance timeout later, but if it will
> >> be
> >> > > tough
> >> > > > > to
> >> > > > > > > > remove
> >> > > > > > > > > > once it's there. All the same, I'm not that keen on
> >> another
> >> > > > > > iteration
> >> > > > > > > > of
> >> > > > > > > > > > this problem, so if we believe this use case is common
> >> > > enough,
> >> > > > > then
> >> > > > > > > > maybe
> >> > > > > > > > > > we should add it now.
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks,
> >> > > > > > > > > > Jason
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <
> >> > > > > > > > > > onurkaraman.apa...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Thanks for the KIP writeup, Jason.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Before anything else, I just wanted to point out
> that
> >> > it's
> >> > > > > worth
> >> > > > > > > > > > mentioning
> >> > > > > > > > > > > the "heartbeat.interval.ms" consumer config in the
> >> KIP
> >> > for
> >> > > > > > > > > completeness.
> >> > > > > > > > > > > Today this config only starts to kick in if poll is
> >> > called
> >> > > > > > > frequently
> >> > > > > > > > > > > enough. A separate heartbeat thread should make this
> >> > config
> >> > > > > > behave
> >> > > > > > > > more
> >> > > > > > > > > > > like what people would expect: a separate thread
> >> sending
> >> > > > > > heartbeats
> >> > > > > > > > at
> >> > > > > > > > > > the
> >> > > > > > > > > > > configured interval.
> >> > > > > > > > > > >
> >> > > > > > > > > > > With this KIP, the relevant configs become:
> >> > > > > > > > > > > "max.poll.records" - already exists
> >> > > > > > > > > > > "session.timeout.ms" - already exists
> >> > > > > > > > > > > "heartbeat.interval.ms" - already exists
> >> > > > > > > > > > > "process.timeout.ms" - new
> >> > > > > > > > > > >
> >> > > > > > > > > > > After reading the KIP several times, I think it
> would
> >> be
> >> > > > > helpful
> >> > > > > > to
> >> > > > > > > > be
> >> > > > > > > > > > more
> >> > > > > > > > > > > explicit in the desired outcome. Is it trying to
> make
> >> > > faster
> >> > > > > > > > > > > best/average/worst case rebalance times? Is it
> trying
> >> to
> >> > > make
> >> > > > > the
> >> > > > > > > > > clients
> >> > > > > > > > > > > need less configuration tuning?
> >> > > > > > > > > > >
> >> > > > > > > > > > > Also it seems that brokers probably still want to
> >> enforce
> >> > > > > minimum
> >> > > > > > > and
> >> > > > > > > > > > > maximum rebalance timeouts just as with the minimum
> >> and
> >> > > > maximum
> >> > > > > > > > session
> >> > > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory
> >> > > > indefinitely.
> >> > > > > So
> >> > > > > > > > we'd
> >> > > > > > > > > > add
> >> > > > > > > > > > > new "group.min.rebalance.timeout.ms" and "
> >> > > > > > > > > group.max.rebalance.timeout.ms
> >> > > > > > > > > > "
> >> > > > > > > > > > > broker configs which again might need to be brought
> >> up in
> >> > > the
> >> > > > > > KIP.
> >> > > > > > > > > Let's
> >> > > > > > > > > > > say we add these bounds. A side-effect of having
> >> > > broker-side
> >> > > > > > bounds
> >> > > > > > > > on
> >> > > > > > > > > > > rebalance timeouts in combination with Java clients
> >> that
> >> > > > makes
> >> > > > > > > > process
> >> > > > > > > > > > > timeouts the same as rebalance timeouts is that the
> >> > broker
> >> > > > > > > > effectively
> >> > > > > > > > > > > dictates the max processing time allowed between
> poll
> >> > > calls.
> >> > > > > This
> >> > > > > > > > > gotcha
> >> > > > > > > > > > > exists right now with today's broker-side bounds on
> >> > session
> >> > > > > > > timeouts.
> >> > > > > > > > > So
> >> > > > > > > > > > > I'm not really convinced that the proposal gets rid
> of
> >> > this
> >> > > > > > > > > complication
> >> > > > > > > > > > > mentioned in the KIP.
> >> > > > > > > > > > >
> >> > > > > > > > > > > I think the main question to ask is: does the KIP
> >> > actually
> >> > > > > make a
> >> > > > > > > > > > > difference?
> >> > > > > > > > > > >
> >> > > > > > > > > > > It looks like this KIP improves rebalance times
> >> > > specifically
> >> > > > > when
> >> > > > > > > the
> >> > > > > > > > > > > client currently has processing times large enough
> to
> >> > force
> >> > > > > > larger
> >> > > > > > > > > > session
> >> > > > > > > > > > > timeouts and heartbeat intervals to not be honored.
> >> > > > Separating
> >> > > > > > > > session
> >> > > > > > > > > > > timeouts from processing time means clients can keep
> >> > their
> >> > > "
> >> > > > > > > > > > > session.timeout.ms" low so the coordinator can
> >> quickly
> >> > > > detect
> >> > > > > > > > process
> >> > > > > > > > > > > failure, and honoring a low "heartbeat.interval.ms"
> >> on
> >> > the
> >> > > > > > > separate
> >> > > > > > > > > > > heartbeat thread means clients will be quickly
> >> notified
> >> > of
> >> > > > > group
> >> > > > > > > > > > membership
> >> > > > > > > > > > > and subscription changes - all without placing
> >> difficult
> >> > > > > > > expectations
> >> > > > > > > > > on
> >> > > > > > > > > > > processing time. But even so, rebalancing through
> the
> >> > > calling
> >> > > > > > > thread
> >> > > > > > > > > > means
> >> > > > > > > > > > > the slowest processing client in the group will
> still
> >> be
> >> > > the
> >> > > > > rate
> >> > > > > > > > > > limiting
> >> > > > > > > > > > > step when looking at rebalance times.
> >> > > > > > > > > > >
> >> > > > > > > > > > > From a usability perspective, the burden still seems
> >> like
> >> > > it
> >> > > > > will
> >> > > > > > > be
> >> > > > > > > > > > tuning
> >> > > > > > > > > > > the processing time to keep the "progress liveness"
> >> happy
> >> > > > > during
> >> > > > > > > > > > rebalances
> >> > > > > > > > > > > while still having reasonable upper bounds on
> >> rebalance
> >> > > > times.
> >> > > > > It
> >> > > > > > > > still
> >> > > > > > > > > > > looks like users have to do almost the exact same
> >> tricks
> >> > as
> >> > > > > today
> >> > > > > > > > when
> >> > > > > > > > > > the
> >> > > > > > > > > > > group membership changes due slow processing times
> >> even
> >> > > > though
> >> > > > > > all
> >> > > > > > > > the
> >> > > > > > > > > > > consumers are alive and the topics haven't change:
> >> > > > > > > > > > > 1. Increase the rebalance timeout to give more time
> >> for
> >> > > > record
> >> > > > > > > > > processing
> >> > > > > > > > > > > (the difference compared to today is that we bump
> the
> >> > > > rebalance
> >> > > > > > > > timeout
> >> > > > > > > > > > > instead of session timeout).
> >> > > > > > > > > > > 2. Reduce the number of records handled on each
> >> iteration
> >> > > > with
> >> > > > > > > > > > > max.poll.records.
> >> > > > > > > > > > >
> >> > > > > > > > > > > This burden goes away if you loosen the liveness
> >> property
> >> > > by
> >> > > > > > > having a
> >> > > > > > > > > > > required rebalance time and optional processing time
> >> > where
> >> > > > > > > rebalance
> >> > > > > > > > > > > happens in the background thread as stated in the
> KIP.
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson <
> >> > > > > > > > ja...@confluent.io>
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Hey Grant,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Thanks for the feedback. I'm definitely open to
> >> > including
> >> > > > > > > > heartbeat()
> >> > > > > > > > > > in
> >> > > > > > > > > > > > this KIP. One thing we should be clear about is
> what
> >> > the
> >> > > > > > behavior
> >> > > > > > > > of
> >> > > > > > > > > > > > heartbeat() should be when the group begins
> >> > rebalancing.
> >> > > I
> >> > > > > > think
> >> > > > > > > > > there
> >> > > > > > > > > > > are
> >> > > > > > > > > > > > basically two options:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if
> the
> >> > > group
> >> > > > > has
> >> > > > > > > > > started
> >> > > > > > > > > > > > rebalancing.
> >> > > > > > > > > > > > 2. heartbeat() completes the rebalance itself.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > With the first option, when processing takes
> longer
> >> > than
> >> > > > the
> >> > > > > > > > > rebalance
> >> > > > > > > > > > > > timeout, the member will fall out of the group
> which
> >> > will
> >> > > > > cause
> >> > > > > > > an
> >> > > > > > > > > > offset
> >> > > > > > > > > > > > commit failure when it finally finishes. However,
> if
> >> > > > > processing
> >> > > > > > > > > > finishes
> >> > > > > > > > > > > > before the rebalance completes, then offsets can
> >> still
> >> > be
> >> > > > > > > > committed.
> >> > > > > > > > > On
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > other hand, if heartbeat() completes the rebalance
> >> > > itself,
> >> > > > > then
> >> > > > > > > > > you'll
> >> > > > > > > > > > > > definitely see the offset commit failure for any
> >> > records
> >> > > > > being
> >> > > > > > > > > > processed.
> >> > > > > > > > > > > > So the first option is sort of biased toward
> >> processing
> >> > > > > > > completion
> >> > > > > > > > > > while
> >> > > > > > > > > > > > the latter is biased toward rebalance completion.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I'm definitely not a fan of second option since it
> >> > takes
> >> > > > away
> >> > > > > > the
> >> > > > > > > > > > choice
> >> > > > > > > > > > > to
> >> > > > > > > > > > > > finish processing before rejoining. However, I do
> >> see
> >> > > some
> >> > > > > > > benefit
> >> > > > > > > > in
> >> > > > > > > > > > the
> >> > > > > > > > > > > > first option if the user wants to keep rebalance
> >> time
> >> > low
> >> > > > and
> >> > > > > > > > doesn't
> >> > > > > > > > > > > mind
> >> > > > > > > > > > > > being kicked out of the group if processing takes
> >> > longer
> >> > > > > > during a
> >> > > > > > > > > > > > rebalance. This may be a reasonable tradeoff since
> >> > > consumer
> >> > > > > > > groups
> >> > > > > > > > > are
> >> > > > > > > > > > > > presumed to be stable most of the time. A better
> >> option
> >> > > in
> >> > > > > that
> >> > > > > > > > case
> >> > > > > > > > > > > might
> >> > > > > > > > > > > > be to expose the rebalance timeout to the user
> >> directly
> >> > > > since
> >> > > > > > it
> >> > > > > > > > > would
> >> > > > > > > > > > > > allow the user to use an essentially unbounded
> >> > > > > > > process.timeout.ms
> >> > > > > > > > > for
> >> > > > > > > > > > > > highly variant processing while still keeping
> >> rebalance
> >> > > > time
> >> > > > > > > > limited.
> >> > > > > > > > > > Of
> >> > > > > > > > > > > > course, it would be another timeout for the user
> to
> >> > > > > > understand...
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Thanks,
> >> > > > > > > > > > > > Jason
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <
> >> > > > > > > ghe...@cloudera.com>
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Hi Jason,
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Thanks for writing up a proposal (and a thorough
> >> > one)!
> >> > > > This
> >> > > > > > is
> >> > > > > > > > > > > something
> >> > > > > > > > > > > > > that I had been thinking about this week too as
> I
> >> > have
> >> > > > run
> >> > > > > > into
> >> > > > > > > > it
> >> > > > > > > > > > more
> >> > > > > > > > > > > > > than a handful of times now.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > I like the idea of having a larger processing
> >> > timeout,
> >> > > > that
> >> > > > > > > > timeout
> >> > > > > > > > > > in
> >> > > > > > > > > > > > > unison with max.poll.records should in many
> cases
> >> > > > provide a
> >> > > > > > > > > > reasonable
> >> > > > > > > > > > > > > assurance that the consumer will stay alive.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > In rejected alternatives "Add a separate API the
> >> user
> >> > > can
> >> > > > > > call
> >> > > > > > > to
> >> > > > > > > > > > > > indicate
> >> > > > > > > > > > > > > liveness" is listed. I think a heartbeat api
> >> could be
> >> > > > added
> >> > > > > > > along
> >> > > > > > > > > > with
> >> > > > > > > > > > > > > these new timeout configurations and used for
> >> > > "advanced"
> >> > > > > use
> >> > > > > > > > cases
> >> > > > > > > > > > > where
> >> > > > > > > > > > > > > the processing time could be highly variant and
> >> less
> >> > > > > > > > predictable. I
> >> > > > > > > > > > > > think a
> >> > > > > > > > > > > > > place where we might use the heartbeat api in
> >> Kafka
> >> > is
> >> > > > > > > > MirrorMaker.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Today, I have seen people trying to find ways to
> >> > > leverage
> >> > > > > the
> >> > > > > > > > > > existing
> >> > > > > > > > > > > > api
> >> > > > > > > > > > > > > to "force" heartbeats by:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > 1. Calling poll to get the batch of records to
> >> > process
> >> > > > > > > > > > > > > 2. Call pause on all partitions
> >> > > > > > > > > > > > > 3. Process the record batch
> >> > > > > > > > > > > > > 3a. While processing periodically call poll
> >> (which is
> >> > > > > > > essentially
> >> > > > > > > > > > just
> >> > > > > > > > > > > > > heartbeat since it returns no records and is
> >> paused)
> >> > > > > > > > > > > > > 4. Commit offsets and un-pause
> >> > > > > > > > > > > > > 5. Repeat from 1
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Thanks,
> >> > > > > > > > > > > > > Grant
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason
> Gustafson <
> >> > > > > > > > > ja...@confluent.io
> >> > > > > > > > > > >
> >> > > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Hi All,
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > One of the persistent problems we see with the
> >> new
> >> > > > > consumer
> >> > > > > > > is
> >> > > > > > > > > the
> >> > > > > > > > > > > use
> >> > > > > > > > > > > > of
> >> > > > > > > > > > > > > > the session timeout in order to ensure
> progress.
> >> > > > Whenever
> >> > > > > > > there
> >> > > > > > > > > is
> >> > > > > > > > > > a
> >> > > > > > > > > > > > > delay
> >> > > > > > > > > > > > > > in message processing which exceeds the
> session
> >> > > > timeout,
> >> > > > > no
> >> > > > > > > > > > > heartbeats
> >> > > > > > > > > > > > > can
> >> > > > > > > > > > > > > > be sent and the consumer is removed from the
> >> group.
> >> > > We
> >> > > > > seem
> >> > > > > > > to
> >> > > > > > > > > hit
> >> > > > > > > > > > > this
> >> > > > > > > > > > > > > > problem everywhere the consumer is used
> >> (including
> >> > > > Kafka
> >> > > > > > > > Connect
> >> > > > > > > > > > and
> >> > > > > > > > > > > > > Kafka
> >> > > > > > > > > > > > > > Streams) and we don't always have a great
> >> solution.
> >> > > > I've
> >> > > > > > > > written
> >> > > > > > > > > a
> >> > > > > > > > > > > KIP
> >> > > > > > > > > > > > to
> >> > > > > > > > > > > > > > address this problem here:
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >> > > > > > > > > > > > > > .
> >> > > > > > > > > > > > > > Have a look and let me know what you think.
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Thanks,
> >> > > > > > > > > > > > > > Jason
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > --
> >> > > > > > > > > > > > > Grant Henke
> >> > > > > > > > > > > > > Software Engineer | Cloudera
> >> > > > > > > > > > > > > gr...@cloudera.com | twitter.com/gchenke |
> >> > > > > > > > > > linkedin.com/in/granthenke
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > --
> >> > > > > > > > > -- Guozhang
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > --
> >> > > > > > > -- Guozhang
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Thanks,
> >> > > > > Ewen
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to