I have a question on the KIP on long stall during
ProcessorStateManager.restoreActiveState(), this can be a long stall when
we need to rebuild the RocksDB state on a new node.

1. Is restoreActiveState() considered as post rebalance since this is
invoked on application rebalance listener?
2. When the node A was spending long time rebuilding the state in
restoreActiveState() from the previous rebalance, a new node (node B) send
a new JoinGroup request to the co-ordinator, how long should the
coordinator wait for node A to finish the restoreActiveState from the
previous rebalance, the restoreActiveState can take more than 10 minutes
for a big state.


On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jason,
>
> Thanks for this very useful KIP.  In general I am with Guozhang on the
> purpose of of the three timeouts.
> 1) session timeout for consumer liveness,
> 2) process timeout (or maybe we should rename it to max.poll.interval.ms)
> for application liveness,
> 3) rebalance timeout for faster rebalance in some failure cases.
>
> It seems the current discussion is mainly about whether we need 3) as a
> separate timeout or not. The current KIP proposal is to combine 2) and 3),
> i.e. just use process timeout as rebalance timeout. That means we need to
> either increase rebalance timeout out to let it adapt to process timeout,
> or the reverse. It would be helpful to understand the impact of these two
> cases. Here are my two cents.
>
> For users who are consuming data from Kafka, usually they either care about
> throughput or care about latency.
>
> If users care about the latency, they would probably care more about
> average latency instead of 99.99 percentile latency which can be affected
> by many other more common reasons other than consumer failure. Because all
> the timeout we are discussing here only have impact on the 99.99 percentile
> latency, I don't think it would really make a difference for latency
> sensitive users.
>
> The majority of the use cases for Kafka Connect and Mirror Maker are
> throughput sensitive. Ewen raised a good example where Kafka Connect needs
> to process the previous data on rebalance therefore requires a higher
> rebalance timeout than process timeout. This is essentially the same in
> Mirror Maker, where each rebalance needs to flush all the messages in the
> accumulator in the producer. That could take some time depending on how
> many messages are there. In this case, we may need to increase the process
> timeout to make it the same as rebalance timeout. But this is probably
> fine. The downside of increasing process timeout is a longer detection time
> of a consumer failure.  Detecting a consumer failure a little later only
> has limited impact because the rest of the consumers in the same group are
> still working fine. So the total throughput is unlikely to drop
> significantly. As long as the rebalance is not taking longer it should be
> fine. The reason we care more about how fast rebalance can finish is
> because during rebalance no consumer in the group is consuming, i.e.
> throughput is zero. So we want to make the rebalance finish as quickly as
> possible.
>
> Compare with increasing process timeout to rebalance timeout, it seems a
> more common case where user wants a longer process timeout, but smaller
> rebalance timeout. I am more worried about this case where we have to
> shoehorn the rebalance timeout into process timeout. For users care about
> throughput, that might cause the rebalance to take unnecessarily longer.
> Admittedly this only has impact when a consumer had problem during
> rebalance, but depending on how long the process timeout was set, the
> rebalance could potentially take forever like Guozhang mentioned.
>
> I agree with Guozhang that we can start with 1) and 2) and add 3) later if
> needed. But adding rebalance timeout is more involved than just adding a
> configuration. That also means the rebalance has to be done in the
> background heartbeat thread. Hence we have to synchronize rebalance and
> consumer.poll() like we did in old consumer. Otherwise user may lose
> messages if auto commit is enabled, or the manual commit might fail after a
> consumer.poll() because the partitions might have been reassigned. So
> having a separate rebalance timeout also potentially means a big change to
> the users as well.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Ewen,
> >
> > I confess your comments caught me off guard. It never occurred to me that
> > anyone would ask for a rebalance timeout so that it could be set _larger_
> > than the process timeout. Even with buffered or batch processing, I would
> > usually expect flushing before a rebalance to take no more time than a
> > periodic flush. Otherwise, I'd probably try to see if there was some
> > workload I could push into periodic flushes so that rebalances could
> > complete faster. But supposing this isn't possible or practical in some
> > cases, I'm wondering how limiting it would be in practice to have only
> the
> > one timeout in this case? I'm a little reluctant to add the additional
> > timeout since I think most users would not have a strong need to keep a
> > tight bound on normal processing time. (I'm also reminded that Jay
> > mentioned he might have to dock everyone's pay 5% for each new timeout we
> > introduce ;-)
> >
> > Thanks,
> > Jason
> >
> >
> >
> >
> > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Ewen,
> > >
> > > I think you are right, the rebalance process could potentially involve
> > all
> > > the delayed compute / IO. More specifically, this is what I think of
> the
> > > rebalance process:
> > >
> > > 1. Coordinator decides to rebalance, start ticking based on rebalance
> > time
> > > out.
> > > 2. Consumer realize rebalance needed when calling poll(); trigger
> > > onPartitionRevoked().
> > > 3. Consumer sends JoinGroupRequest;
> > > 4. Coordinator send JoinGroupResponse; start ticking on the leader.
> > > 5. Leader compute and send SyncGroupRequest
> > > 6. Coordinator send SyncGroupResponse; start ticking on session
> timeout.
> > > 7. Consumer get new assignment; trigger onPartitionAssigned().
> > >
> > > In the above process: delayed compute / IO is usually done at step 2);
> > > workload initialization is usually done in step 7); and some admin work
> > > (like in Kafka Streams) are likely to be done in step 5). As in the
> > current
> > > KIP proposal the rebalance timeout on the coordinator start ticking on
> 1)
> > > on everyone in the group, and stop ticking on 3); it start ticking on
> > > leader again on step 4), and stop upon step 5). In this case the
> delayed
> > > compute / IO contained in step 2) is covered by this rebalance timeout.
> > >
> > > That being said, I think for "worst case", the time of processing a
> > single
> > > record would still be similar to rebalancing, since both of which could
> > > result in completing all delayed compute / IO so far. And since
> > "processing
> > > timeout" is used to cover the worst case, it should be still OK?
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > > wrote:
> > >
> > > > Jason,
> > > >
> > > > I've been thinking about this more in terms of something like
> Connect.
> > I
> > > > think the rebalance timeout may be a bit different from the process
> > > > timeout, and even the process timeout is a bit of a misnomer.
> > > >
> > > > We sort of talk about the process timeout as if it can be an
> indicator
> > of
> > > > maximum processing time for a record/batch. This makes sense for a
> case
> > > of
> > > > a data-dependent load (i.e. you can only load some data from slow
> > storage
> > > > after seeing some data) where that load might be very large compared
> to
> > > > normal processing time. It also makes sense if you have auto commit
> > > enabled
> > > > because you need to be completely finished processing the data before
> > > > calling poll() again, so that time before you call another consumer
> API
> > > > actually reflects processing time.
> > > >
> > > > It might makes less sense in cases like streams (or any other app)
> that
> > > > batch writes to disk, or connectors that "process" a message by
> > enqueuing
> > > > the data, but won't commit offsets until data is flushed, possibly
> > during
> > > > some other, much later iteration of processing. In this case I think
> > > > processing time and rebalance time could potentially differ
> > > significantly.
> > > > During normal processing, you can potentially pipeline quite a bit,
> > > > buffering up changes, flushing as needed, but then only committing
> once
> > > > flushing is complete. But rebalancing is different then -- you *must*
> > > > finish flushing all the data or manually choose to discard the data
> > > > (presumably by doing something like watching for the process timeout
> > you
> > > > set and bailing early, only committing the offsets for data you've
> > > > flushed). If you have lots of data built up, the cost for rebalancing
> > > could
> > > > be a *lot* higher than the maximum time you would otherwise see
> between
> > > > calls to consumer APIs to indicate processing progress.
> > > >
> > > > The thing that makes these cases different is that processing isn't
> > > > actually tied to calls to the consumer API. You can queue up /
> > pipeline /
> > > > defer some of the work. (By the way, this is currently a limitation
> of
> > > sink
> > > > connectors that I'm not thrilled about -- offset commit requires a
> full
> > > > flush, whereas some coordination with the sink connector to not
> > require a
> > > > full flush except on rebalances would be much nicer, albeit more
> > > difficult
> > > > for sink connectors to implement.)
> > > >
> > > > -Ewen
> > > >
> > > >
> > > >
> > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > I'm actually not too concerned about the time spent in the
> rebalance
> > > > > callback specifically. Both it and regular processing time in the
> > poll
> > > > loop
> > > > > will delay the rebalance and keep joined consumers idle. However,
> if
> > we
> > > > > expose the rebalance timeout, then it would give users the option
> to
> > > > > effective disable the process timeout while still keeping a maximum
> > > bound
> > > > > on the rebalance time. If the consumer cannot complete its
> processing
> > > > fast
> > > > > enough and rejoin, then it would be evicted. This provides
> something
> > > like
> > > > > (2) since the other consumers in the group would be able to
> complete
> > > the
> > > > > rebalance and resume work while the evicted consumer would have to
> > > > rollback
> > > > > progress. This is not too different from rebalancing in the
> > background
> > > > > which also typically would cause commit failure and rollback
> (though
> > at
> > > > > least the consumer stays in the group).
> > > > >
> > > > > Now that I'm thinking about it more, I'm not sure this would be a
> > great
> > > > > facility to depend on in practice. It might be OK if just one or
> two
> > of
> > > > the
> > > > > consumers fall out of the group during the rebalance, but if half
> the
> > > > group
> > > > > is regularly getting evicted, it would be a problem. So even if we
> > > expose
> > > > > the rebalance timeout, the user is still going to have to set it
> with
> > > > some
> > > > > idea in mind about how long processing should take.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > With the current usage pattern of:
> > > > > >
> > > > > > while(..) {
> > > > > >
> > > > > >   consumer.poll(/* where rebalance happens */)
> > > > > >
> > > > > >   // process messages
> > > > > > }
> > > > > >
> > > > > > ----------
> > > > > >
> > > > > > And since rebalance is till on the caller thread, not the
> > background
> > > > > > thread, if coordinator decides to rebalance while user thread is
> > > still
> > > > on
> > > > > > processing messages, there is no options but we are forced to go
> > with
> > > > 1)
> > > > > > right? I think the your / Onur's point here, which I agree, is
> that
> > > by
> > > > > > reusing process timeout as rebalance timeout, if the rebalance
> > > callback
> > > > > > could take longer time than processing a batch, users need to set
> > the
> > > > > > timeout value to the higher of the two, hence the callback
> latency,
> > > > which
> > > > > > will make detection of processing stallness less effective,
> right?
> > > > > >
> > > > > > As I mentioned  in my previous email, I feel that this case of
> > > > "callback
> > > > > > function time taking loner than processing a batch" would not be
> > > > frequent
> > > > > > in practice, and the processing timeout would usually be a good
> > > higher
> > > > > > bound on the callback function latency. If that is true, I'd
> > suggest
> > > we
> > > > > > keep the current proposal and not add a third timeout config for
> > > > covering
> > > > > > this case.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <
> > ja...@confluent.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Guozhang,
> > > > > > >
> > > > > > > I think the problem is that users may not want to sacrifice
> > > rebalance
> > > > > > > latency because of uncertainty around processing time. As soon
> > as a
> > > > > > > rebalance begins, there are basically two choices:
> > > > > > >
> > > > > > > 1. Block the rebalance until all consumers have finished their
> > > > current
> > > > > > > processing.
> > > > > > > 2. Let all consumers rebalance and "rollback" any processing
> that
> > > > could
> > > > > > not
> > > > > > > be committed before the rebalance completes.
> > > > > > >
> > > > > > > If you choose option (1), then you have an incentive to keep a
> > > > > relatively
> > > > > > > tight bound on process.timeout.ms in order to reduce the
> > > worst-case
> > > > > idle
> > > > > > > time during a rebalance. But if you fail to set it high enough,
> > > then
> > > > > > you'll
> > > > > > > get spurious rebalances during normal processing. I think Onur
> is
> > > > > saying
> > > > > > > that this still sort of sucks for users. On the other hand, if
> > (2)
> > > is
> > > > > > > acceptable, then users will have more freedom to err on the
> high
> > > side
> > > > > > when
> > > > > > > setting process.timeout.ms, or even disable it entirely. They
> > will
> > > > > have
> > > > > > to
> > > > > > > deal with rolling back any progress which cannot be committed
> > after
> > > > the
> > > > > > > rebalance completes, but maybe this is less of a problem for
> some
> > > > > users?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Onur, Jason:
> > > > > > > >
> > > > > > > > Here are some thoughts about reusing process timeout as
> > > server-side
> > > > > > > > rebalance timeout: First of all, my understanding is that
> > > > > > > >
> > > > > > > > 1) session timeout is for detecting consumer crash / hard
> > > failures
> > > > > (in
> > > > > > > this
> > > > > > > > case the heartbeat thread will be dead as well, hence
> > coordinator
> > > > > > > realized
> > > > > > > > within session timeout value).
> > > > > > > >
> > > > > > > > 2) process timeout is for checking liveness of the user
> thread
> > > that
> > > > > > calls
> > > > > > > > the consumer as well as does the processing: when no consumer
> > > calls
> > > > > are
> > > > > > > > made within the process timeout, heartbeat thread stop
> working
> > > and
> > > > > > hence
> > > > > > > it
> > > > > > > > will be detected by coordinator.
> > > > > > > >
> > > > > > > > 3) a potential server-side rebalance timeout would be used to
> > > > detect
> > > > > > > > consumer liveness during the rebalance period, in which the
> > user
> > > > > thread
> > > > > > > is
> > > > > > > > tied with the "poll" call and also the callback function, to
> > > > prevent
> > > > > a
> > > > > > > slow
> > > > > > > > / stalled consumer in their rebalance callback to cause the
> > > > rebalance
> > > > > > > > taking forever.
> > > > > > > >
> > > > > > > > I think we generally have two cases in practice regarding 3)
> > > above:
> > > > > > user
> > > > > > > > either does almost nothing and hence should never be stalled
> > > > (unless
> > > > > > > there
> > > > > > > > is a long GC), or they do various external IOs for
> maintaining
> > > > their
> > > > > > own
> > > > > > > > states, for example, which could be taking long or even cause
> > the
> > > > > > thread
> > > > > > > to
> > > > > > > > stall. We do not need to worry too much about the former
> case,
> > > and
> > > > as
> > > > > > for
> > > > > > > > latter case if the process timeout value should usually be a
> > good
> > > > > > higher
> > > > > > > > bound on the rebalance latency.
> > > > > > > >
> > > > > > > > That being said, if we observe that there is indeed a common
> > > usage
> > > > > > where
> > > > > > > 2)
> > > > > > > > and 3) would require very different timeout values which
> > > overwhelms
> > > > > the
> > > > > > > > complexity of three timeout values, we can consider adding a
> > > third
> > > > > one
> > > > > > > > then: it is easier to add more configs later.
> > > > > > > >
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <
> > > > ja...@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Onur,
> > > > > > > > >
> > > > > > > > > Thanks for the detailed response. I think the problem of
> > > > > controlling
> > > > > > > > > rebalance times is the main (known) gap in the proposal as
> it
> > > > > stands.
> > > > > > > > >
> > > > > > > > > This burden goes away if you loosen the liveness property
> by
> > > > > having a
> > > > > > > > > > required rebalance time and optional processing time
> where
> > > > > > rebalance
> > > > > > > > > > happens in the background thread as stated in the KIP.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Just to clarify, the current KIP only allows rebalances to
> > > > complete
> > > > > > in
> > > > > > > > the
> > > > > > > > > foreground. When I suggested above in reply to Grant was
> that
> > > we
> > > > > > could
> > > > > > > > add
> > > > > > > > > a separate rebalance timeout setting, the behavior I had in
> > > mind
> > > > > was
> > > > > > to
> > > > > > > > let
> > > > > > > > > the consumer fall out of the group if the timeout is
> reached
> > > > while
> > > > > > the
> > > > > > > > > consumer is still processing. I was specifically trying to
> > > avoid
> > > > > > moving
> > > > > > > > the
> > > > > > > > > rebalance to the background thread since this significantly
> > > > > increases
> > > > > > > the
> > > > > > > > > complexity of the implementation. We'd also have to think
> > about
> > > > > > > > > compatibility a bit more. For example, what are the
> > > implications
> > > > of
> > > > > > > > having
> > > > > > > > > the rebalance listener execute in a separate thread?
> > > > > > > > >
> > > > > > > > > Putting that issue aside, I think we need to convince
> > ourselves
> > > > > that
> > > > > > a
> > > > > > > > > separate rebalance timeout is really necessary since every
> > new
> > > > > > timeout
> > > > > > > > adds
> > > > > > > > > some conceptual noise which all users will see. My thought
> in
> > > > this
> > > > > > KIP
> > > > > > > > was
> > > > > > > > > that users who didn't want the burden of tuning the process
> > > > timeout
> > > > > > > could
> > > > > > > > > use a relatively large value without a major impact because
> > > group
> > > > > > > > > rebalances themselves will typically be infrequent. The
> main
> > > > > concern
> > > > > > is
> > > > > > > > for
> > > > > > > > > users who have highly variant processing times and want to
> > > > ensure a
> > > > > > > tight
> > > > > > > > > bound on rebalance times (even if it means having to
> discard
> > > some
> > > > > > > > > processing that cannot be completed before the rebalance
> > > > finishes).
> > > > > > > These
> > > > > > > > > users will be left trying to tune process.timeout.ms and
> > > > > > > > max.poll.records,
> > > > > > > > > which is basically the same position they are currently in.
> > The
> > > > > > problem
> > > > > > > > is
> > > > > > > > > I don't know how common this case is, so I'm not sure how
> it
> > > > weighs
> > > > > > > > against
> > > > > > > > > the cost of having an additional timeout that needs to be
> > > > > explained.
> > > > > > We
> > > > > > > > can
> > > > > > > > > always add the rebalance timeout later, but if it will be
> > tough
> > > > to
> > > > > > > remove
> > > > > > > > > once it's there. All the same, I'm not that keen on another
> > > > > iteration
> > > > > > > of
> > > > > > > > > this problem, so if we believe this use case is common
> > enough,
> > > > then
> > > > > > > maybe
> > > > > > > > > we should add it now.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <
> > > > > > > > > onurkaraman.apa...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the KIP writeup, Jason.
> > > > > > > > > >
> > > > > > > > > > Before anything else, I just wanted to point out that
> it's
> > > > worth
> > > > > > > > > mentioning
> > > > > > > > > > the "heartbeat.interval.ms" consumer config in the KIP
> for
> > > > > > > > completeness.
> > > > > > > > > > Today this config only starts to kick in if poll is
> called
> > > > > > frequently
> > > > > > > > > > enough. A separate heartbeat thread should make this
> config
> > > > > behave
> > > > > > > more
> > > > > > > > > > like what people would expect: a separate thread sending
> > > > > heartbeats
> > > > > > > at
> > > > > > > > > the
> > > > > > > > > > configured interval.
> > > > > > > > > >
> > > > > > > > > > With this KIP, the relevant configs become:
> > > > > > > > > > "max.poll.records" - already exists
> > > > > > > > > > "session.timeout.ms" - already exists
> > > > > > > > > > "heartbeat.interval.ms" - already exists
> > > > > > > > > > "process.timeout.ms" - new
> > > > > > > > > >
> > > > > > > > > > After reading the KIP several times, I think it would be
> > > > helpful
> > > > > to
> > > > > > > be
> > > > > > > > > more
> > > > > > > > > > explicit in the desired outcome. Is it trying to make
> > faster
> > > > > > > > > > best/average/worst case rebalance times? Is it trying to
> > make
> > > > the
> > > > > > > > clients
> > > > > > > > > > need less configuration tuning?
> > > > > > > > > >
> > > > > > > > > > Also it seems that brokers probably still want to enforce
> > > > minimum
> > > > > > and
> > > > > > > > > > maximum rebalance timeouts just as with the minimum and
> > > maximum
> > > > > > > session
> > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory
> > > indefinitely.
> > > > So
> > > > > > > we'd
> > > > > > > > > add
> > > > > > > > > > new "group.min.rebalance.timeout.ms" and "
> > > > > > > > group.max.rebalance.timeout.ms
> > > > > > > > > "
> > > > > > > > > > broker configs which again might need to be brought up in
> > the
> > > > > KIP.
> > > > > > > > Let's
> > > > > > > > > > say we add these bounds. A side-effect of having
> > broker-side
> > > > > bounds
> > > > > > > on
> > > > > > > > > > rebalance timeouts in combination with Java clients that
> > > makes
> > > > > > > process
> > > > > > > > > > timeouts the same as rebalance timeouts is that the
> broker
> > > > > > > effectively
> > > > > > > > > > dictates the max processing time allowed between poll
> > calls.
> > > > This
> > > > > > > > gotcha
> > > > > > > > > > exists right now with today's broker-side bounds on
> session
> > > > > > timeouts.
> > > > > > > > So
> > > > > > > > > > I'm not really convinced that the proposal gets rid of
> this
> > > > > > > > complication
> > > > > > > > > > mentioned in the KIP.
> > > > > > > > > >
> > > > > > > > > > I think the main question to ask is: does the KIP
> actually
> > > > make a
> > > > > > > > > > difference?
> > > > > > > > > >
> > > > > > > > > > It looks like this KIP improves rebalance times
> > specifically
> > > > when
> > > > > > the
> > > > > > > > > > client currently has processing times large enough to
> force
> > > > > larger
> > > > > > > > > session
> > > > > > > > > > timeouts and heartbeat intervals to not be honored.
> > > Separating
> > > > > > > session
> > > > > > > > > > timeouts from processing time means clients can keep
> their
> > "
> > > > > > > > > > session.timeout.ms" low so the coordinator can quickly
> > > detect
> > > > > > > process
> > > > > > > > > > failure, and honoring a low "heartbeat.interval.ms" on
> the
> > > > > > separate
> > > > > > > > > > heartbeat thread means clients will be quickly notified
> of
> > > > group
> > > > > > > > > membership
> > > > > > > > > > and subscription changes - all without placing difficult
> > > > > > expectations
> > > > > > > > on
> > > > > > > > > > processing time. But even so, rebalancing through the
> > calling
> > > > > > thread
> > > > > > > > > means
> > > > > > > > > > the slowest processing client in the group will still be
> > the
> > > > rate
> > > > > > > > > limiting
> > > > > > > > > > step when looking at rebalance times.
> > > > > > > > > >
> > > > > > > > > > From a usability perspective, the burden still seems like
> > it
> > > > will
> > > > > > be
> > > > > > > > > tuning
> > > > > > > > > > the processing time to keep the "progress liveness" happy
> > > > during
> > > > > > > > > rebalances
> > > > > > > > > > while still having reasonable upper bounds on rebalance
> > > times.
> > > > It
> > > > > > > still
> > > > > > > > > > looks like users have to do almost the exact same tricks
> as
> > > > today
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > group membership changes due slow processing times even
> > > though
> > > > > all
> > > > > > > the
> > > > > > > > > > consumers are alive and the topics haven't change:
> > > > > > > > > > 1. Increase the rebalance timeout to give more time for
> > > record
> > > > > > > > processing
> > > > > > > > > > (the difference compared to today is that we bump the
> > > rebalance
> > > > > > > timeout
> > > > > > > > > > instead of session timeout).
> > > > > > > > > > 2. Reduce the number of records handled on each iteration
> > > with
> > > > > > > > > > max.poll.records.
> > > > > > > > > >
> > > > > > > > > > This burden goes away if you loosen the liveness property
> > by
> > > > > > having a
> > > > > > > > > > required rebalance time and optional processing time
> where
> > > > > > rebalance
> > > > > > > > > > happens in the background thread as stated in the KIP.
> > > > > > > > > >
> > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson <
> > > > > > > ja...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Grant,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the feedback. I'm definitely open to
> including
> > > > > > > heartbeat()
> > > > > > > > > in
> > > > > > > > > > > this KIP. One thing we should be clear about is what
> the
> > > > > behavior
> > > > > > > of
> > > > > > > > > > > heartbeat() should be when the group begins
> rebalancing.
> > I
> > > > > think
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > basically two options:
> > > > > > > > > > >
> > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if the
> > group
> > > > has
> > > > > > > > started
> > > > > > > > > > > rebalancing.
> > > > > > > > > > > 2. heartbeat() completes the rebalance itself.
> > > > > > > > > > >
> > > > > > > > > > > With the first option, when processing takes longer
> than
> > > the
> > > > > > > > rebalance
> > > > > > > > > > > timeout, the member will fall out of the group which
> will
> > > > cause
> > > > > > an
> > > > > > > > > offset
> > > > > > > > > > > commit failure when it finally finishes. However, if
> > > > processing
> > > > > > > > > finishes
> > > > > > > > > > > before the rebalance completes, then offsets can still
> be
> > > > > > > committed.
> > > > > > > > On
> > > > > > > > > > the
> > > > > > > > > > > other hand, if heartbeat() completes the rebalance
> > itself,
> > > > then
> > > > > > > > you'll
> > > > > > > > > > > definitely see the offset commit failure for any
> records
> > > > being
> > > > > > > > > processed.
> > > > > > > > > > > So the first option is sort of biased toward processing
> > > > > > completion
> > > > > > > > > while
> > > > > > > > > > > the latter is biased toward rebalance completion.
> > > > > > > > > > >
> > > > > > > > > > > I'm definitely not a fan of second option since it
> takes
> > > away
> > > > > the
> > > > > > > > > choice
> > > > > > > > > > to
> > > > > > > > > > > finish processing before rejoining. However, I do see
> > some
> > > > > > benefit
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > first option if the user wants to keep rebalance time
> low
> > > and
> > > > > > > doesn't
> > > > > > > > > > mind
> > > > > > > > > > > being kicked out of the group if processing takes
> longer
> > > > > during a
> > > > > > > > > > > rebalance. This may be a reasonable tradeoff since
> > consumer
> > > > > > groups
> > > > > > > > are
> > > > > > > > > > > presumed to be stable most of the time. A better option
> > in
> > > > that
> > > > > > > case
> > > > > > > > > > might
> > > > > > > > > > > be to expose the rebalance timeout to the user directly
> > > since
> > > > > it
> > > > > > > > would
> > > > > > > > > > > allow the user to use an essentially unbounded
> > > > > > process.timeout.ms
> > > > > > > > for
> > > > > > > > > > > highly variant processing while still keeping rebalance
> > > time
> > > > > > > limited.
> > > > > > > > > Of
> > > > > > > > > > > course, it would be another timeout for the user to
> > > > > understand...
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Jason
> > > > > > > > > > >
> > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <
> > > > > > ghe...@cloudera.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Jason,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for writing up a proposal (and a thorough
> one)!
> > > This
> > > > > is
> > > > > > > > > > something
> > > > > > > > > > > > that I had been thinking about this week too as I
> have
> > > run
> > > > > into
> > > > > > > it
> > > > > > > > > more
> > > > > > > > > > > > than a handful of times now.
> > > > > > > > > > > >
> > > > > > > > > > > > I like the idea of having a larger processing
> timeout,
> > > that
> > > > > > > timeout
> > > > > > > > > in
> > > > > > > > > > > > unison with max.poll.records should in many cases
> > > provide a
> > > > > > > > > reasonable
> > > > > > > > > > > > assurance that the consumer will stay alive.
> > > > > > > > > > > >
> > > > > > > > > > > > In rejected alternatives "Add a separate API the user
> > can
> > > > > call
> > > > > > to
> > > > > > > > > > > indicate
> > > > > > > > > > > > liveness" is listed. I think a heartbeat api could be
> > > added
> > > > > > along
> > > > > > > > > with
> > > > > > > > > > > > these new timeout configurations and used for
> > "advanced"
> > > > use
> > > > > > > cases
> > > > > > > > > > where
> > > > > > > > > > > > the processing time could be highly variant and less
> > > > > > > predictable. I
> > > > > > > > > > > think a
> > > > > > > > > > > > place where we might use the heartbeat api in Kafka
> is
> > > > > > > MirrorMaker.
> > > > > > > > > > > >
> > > > > > > > > > > > Today, I have seen people trying to find ways to
> > leverage
> > > > the
> > > > > > > > > existing
> > > > > > > > > > > api
> > > > > > > > > > > > to "force" heartbeats by:
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Calling poll to get the batch of records to
> process
> > > > > > > > > > > > 2. Call pause on all partitions
> > > > > > > > > > > > 3. Process the record batch
> > > > > > > > > > > > 3a. While processing periodically call poll (which is
> > > > > > essentially
> > > > > > > > > just
> > > > > > > > > > > > heartbeat since it returns no records and is paused)
> > > > > > > > > > > > 4. Commit offsets and un-pause
> > > > > > > > > > > > 5. Repeat from 1
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Grant
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson <
> > > > > > > > ja...@confluent.io
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi All,
> > > > > > > > > > > > >
> > > > > > > > > > > > > One of the persistent problems we see with the new
> > > > consumer
> > > > > > is
> > > > > > > > the
> > > > > > > > > > use
> > > > > > > > > > > of
> > > > > > > > > > > > > the session timeout in order to ensure progress.
> > > Whenever
> > > > > > there
> > > > > > > > is
> > > > > > > > > a
> > > > > > > > > > > > delay
> > > > > > > > > > > > > in message processing which exceeds the session
> > > timeout,
> > > > no
> > > > > > > > > > heartbeats
> > > > > > > > > > > > can
> > > > > > > > > > > > > be sent and the consumer is removed from the group.
> > We
> > > > seem
> > > > > > to
> > > > > > > > hit
> > > > > > > > > > this
> > > > > > > > > > > > > problem everywhere the consumer is used (including
> > > Kafka
> > > > > > > Connect
> > > > > > > > > and
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > Streams) and we don't always have a great solution.
> > > I've
> > > > > > > written
> > > > > > > > a
> > > > > > > > > > KIP
> > > > > > > > > > > to
> > > > > > > > > > > > > address this problem here:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > > > > > > > > > > .
> > > > > > > > > > > > > Have a look and let me know what you think.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Jason
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Grant Henke
> > > > > > > > > > > > Software Engineer | Cloudera
> > > > > > > > > > > > gr...@cloudera.com | twitter.com/gchenke |
> > > > > > > > > linkedin.com/in/granthenke
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to