Hey Onur,

Thanks for the detailed response. I think the problem of controlling
rebalance times is the main (known) gap in the proposal as it stands.

This burden goes away if you loosen the liveness property by having a
> required rebalance time and optional processing time where rebalance
> happens in the background thread as stated in the KIP.


Just to clarify, the current KIP only allows rebalances to complete in the
foreground. When I suggested above in reply to Grant was that we could add
a separate rebalance timeout setting, the behavior I had in mind was to let
the consumer fall out of the group if the timeout is reached while the
consumer is still processing. I was specifically trying to avoid moving the
rebalance to the background thread since this significantly increases the
complexity of the implementation. We'd also have to think about
compatibility a bit more. For example, what are the implications of having
the rebalance listener execute in a separate thread?

Putting that issue aside, I think we need to convince ourselves that a
separate rebalance timeout is really necessary since every new timeout adds
some conceptual noise which all users will see. My thought in this KIP was
that users who didn't want the burden of tuning the process timeout could
use a relatively large value without a major impact because group
rebalances themselves will typically be infrequent. The main concern is for
users who have highly variant processing times and want to ensure a tight
bound on rebalance times (even if it means having to discard some
processing that cannot be completed before the rebalance finishes). These
users will be left trying to tune process.timeout.ms and max.poll.records,
which is basically the same position they are currently in. The problem is
I don't know how common this case is, so I'm not sure how it weighs against
the cost of having an additional timeout that needs to be explained. We can
always add the rebalance timeout later, but if it will be tough to remove
once it's there. All the same, I'm not that keen on another iteration of
this problem, so if we believe this use case is common enough, then maybe
we should add it now.

Thanks,
Jason


On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <onurkaraman.apa...@gmail.com>
wrote:

> Thanks for the KIP writeup, Jason.
>
> Before anything else, I just wanted to point out that it's worth mentioning
> the "heartbeat.interval.ms" consumer config in the KIP for completeness.
> Today this config only starts to kick in if poll is called frequently
> enough. A separate heartbeat thread should make this config behave more
> like what people would expect: a separate thread sending heartbeats at the
> configured interval.
>
> With this KIP, the relevant configs become:
> "max.poll.records" - already exists
> "session.timeout.ms" - already exists
> "heartbeat.interval.ms" - already exists
> "process.timeout.ms" - new
>
> After reading the KIP several times, I think it would be helpful to be more
> explicit in the desired outcome. Is it trying to make faster
> best/average/worst case rebalance times? Is it trying to make the clients
> need less configuration tuning?
>
> Also it seems that brokers probably still want to enforce minimum and
> maximum rebalance timeouts just as with the minimum and maximum session
> timeouts so DelayedJoins don't stay in purgatory indefinitely. So we'd add
> new "group.min.rebalance.timeout.ms" and "group.max.rebalance.timeout.ms"
> broker configs which again might need to be brought up in the KIP. Let's
> say we add these bounds. A side-effect of having broker-side bounds on
> rebalance timeouts in combination with Java clients that makes process
> timeouts the same as rebalance timeouts is that the broker effectively
> dictates the max processing time allowed between poll calls. This gotcha
> exists right now with today's broker-side bounds on session timeouts. So
> I'm not really convinced that the proposal gets rid of this complication
> mentioned in the KIP.
>
> I think the main question to ask is: does the KIP actually make a
> difference?
>
> It looks like this KIP improves rebalance times specifically when the
> client currently has processing times large enough to force larger session
> timeouts and heartbeat intervals to not be honored. Separating session
> timeouts from processing time means clients can keep their "
> session.timeout.ms" low so the coordinator can quickly detect process
> failure, and honoring a low "heartbeat.interval.ms" on the separate
> heartbeat thread means clients will be quickly notified of group membership
> and subscription changes - all without placing difficult expectations on
> processing time. But even so, rebalancing through the calling thread means
> the slowest processing client in the group will still be the rate limiting
> step when looking at rebalance times.
>
> From a usability perspective, the burden still seems like it will be tuning
> the processing time to keep the "progress liveness" happy during rebalances
> while still having reasonable upper bounds on rebalance times. It still
> looks like users have to do almost the exact same tricks as today when the
> group membership changes due slow processing times even though all the
> consumers are alive and the topics haven't change:
> 1. Increase the rebalance timeout to give more time for record processing
> (the difference compared to today is that we bump the rebalance timeout
> instead of session timeout).
> 2. Reduce the number of records handled on each iteration with
> max.poll.records.
>
> This burden goes away if you loosen the liveness property by having a
> required rebalance time and optional processing time where rebalance
> happens in the background thread as stated in the KIP.
>
> On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Grant,
> >
> > Thanks for the feedback. I'm definitely open to including heartbeat() in
> > this KIP. One thing we should be clear about is what the behavior of
> > heartbeat() should be when the group begins rebalancing. I think there
> are
> > basically two options:
> >
> > 1. heartbeat() simply keeps heartbeating even if the group has started
> > rebalancing.
> > 2. heartbeat() completes the rebalance itself.
> >
> > With the first option, when processing takes longer than the rebalance
> > timeout, the member will fall out of the group which will cause an offset
> > commit failure when it finally finishes. However, if processing finishes
> > before the rebalance completes, then offsets can still be committed. On
> the
> > other hand, if heartbeat() completes the rebalance itself, then you'll
> > definitely see the offset commit failure for any records being processed.
> > So the first option is sort of biased toward processing completion while
> > the latter is biased toward rebalance completion.
> >
> > I'm definitely not a fan of second option since it takes away the choice
> to
> > finish processing before rejoining. However, I do see some benefit in the
> > first option if the user wants to keep rebalance time low and doesn't
> mind
> > being kicked out of the group if processing takes longer during a
> > rebalance. This may be a reasonable tradeoff since consumer groups are
> > presumed to be stable most of the time. A better option in that case
> might
> > be to expose the rebalance timeout to the user directly since it would
> > allow the user to use an essentially unbounded process.timeout.ms for
> > highly variant processing while still keeping rebalance time limited. Of
> > course, it would be another timeout for the user to understand...
> >
> > Thanks,
> > Jason
> >
> > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <ghe...@cloudera.com>
> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for writing up a proposal (and a thorough one)! This is
> something
> > > that I had been thinking about this week too as I have run into it more
> > > than a handful of times now.
> > >
> > > I like the idea of having a larger processing timeout, that timeout in
> > > unison with max.poll.records should in many cases provide a reasonable
> > > assurance that the consumer will stay alive.
> > >
> > > In rejected alternatives "Add a separate API the user can call to
> > indicate
> > > liveness" is listed. I think a heartbeat api could be added along with
> > > these new timeout configurations and used for "advanced" use cases
> where
> > > the processing time could be highly variant and less predictable. I
> > think a
> > > place where we might use the heartbeat api in Kafka is MirrorMaker.
> > >
> > > Today, I have seen people trying to find ways to leverage the existing
> > api
> > > to "force" heartbeats by:
> > >
> > > 1. Calling poll to get the batch of records to process
> > > 2. Call pause on all partitions
> > > 3. Process the record batch
> > > 3a. While processing periodically call poll (which is essentially just
> > > heartbeat since it returns no records and is paused)
> > > 4. Commit offsets and un-pause
> > > 5. Repeat from 1
> > >
> > > Thanks,
> > > Grant
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > One of the persistent problems we see with the new consumer is the
> use
> > of
> > > > the session timeout in order to ensure progress. Whenever there is a
> > > delay
> > > > in message processing which exceeds the session timeout, no
> heartbeats
> > > can
> > > > be sent and the consumer is removed from the group. We seem to hit
> this
> > > > problem everywhere the consumer is used (including Kafka Connect and
> > > Kafka
> > > > Streams) and we don't always have a great solution. I've written a
> KIP
> > to
> > > > address this problem here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > .
> > > > Have a look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > >
> > >
> > >
> > > --
> > > Grant Henke
> > > Software Engineer | Cloudera
> > > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> > >
> >
>

Reply via email to