Hi All,

I wanted to give an update on KIP-62. The patch has gone through a number
of iterations and is getting closer to merging, but there have been a
couple changes which were not part of the initial KIP that I wanted to call
out.

1. To make room for the rebalance timeout in the group metadata message
that we store in __consumer_offsets, we've had to introduce a new version
of the group metadata value schema. This means that upgrades will require
the usual two round rolling upgrade with the inter broker protocol version
bump in between to ensure that all brokers understand the new schema before
beginning to use it. It sounds like this may be needed anyway if KIP-74
gets in, but it's still worth mentioning.

2. Kafka Connect uses the same group management protocol used by the
consumer for its own task rebalancing, which means it must also have a
rebalance timeout configured somewhere. Since there's not really a notion
of polling which is exposed to the user, instead of exposing
max.poll.interval.ms to the user as we are doing for the consumer, we've
instead decided to expose the rebalance timeout directly as
rebalance.timeout.ms.

I am planning to update the KIP to reflect these changes. Please let me
know if you have any concerns.

Thanks,
Jason


On Mon, Jun 13, 2016 at 5:46 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Becket,
>
> Sorry for the late response. I agree there's a little more to think
> through on the implementation. The offset commit is the tricky one since we
> could execute a user-provided callback. I'm thinking if there is an
> inflight request to the coordinator, we may simply skip the heartbeat and
> try again after a short backoff. Probably the only other request that could
> compete with the heartbeat is an offset commit, and this would actually be
> fine since the coordinator treats offset commits as effective heartbeats.
> Anyway, unless you think this problem is serious enough for more
> discussion, I'm going to go ahead and start a vote in the next day or two.
>
> Thanks,
> Jason
>
> On Mon, Jun 6, 2016 at 2:15 PM, Becket Qin <becket....@gmail.com> wrote:
>
>> Guozhang and Jason,
>>
>> I think we are on the same page that having rebalances done in the
>> background thread has a much bigger impact to the users. So I agree that
>> is
>> is probably better to start with having 1) and 2). We can add 3) later if
>> necessary.
>>
>> Another implementation detail I am not quite sure is about making the
>> NetworkClient work with two threads. The KIP implies that this will be
>> done
>> by synchronizing on ConsumerNetworkClient. I am not sure if that is
>> enough,
>> what if a poll() from ConsumerNetworkClient receives a FetchResponse or
>> OffsetFetchResponse which are supposed to be handled by user thread? This
>> is implementation detail but may be worth thinking about a bit more.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Mon, Jun 6, 2016 at 11:27 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Jiangjie:
>> >
>> > About doing the rebalance in the background thread, I'm a bit concerned
>> as
>> > it will change a lot of the concurrency guarantees that consumer
>> currently
>> > provides (think of a consumer caller thread committing externally while
>> the
>> > rebalance is happening in the background thread), and hence if we are
>> > considering changing that now or in the future, we need to think through
>> > all the corner cases.
>> >
>> > So in general, I'd still prefer we reserve a third config for rebalance
>> > timeout in this KIP.
>> >
>> > Guozhang
>> >
>> >
>> > On Mon, Jun 6, 2016 at 11:25 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > (+ Matthias)
>> > >
>> > > Hello Henry,
>> > >
>> > > Specifically to your question regarding Kafka Streams:
>> > >
>> > > 1. Currently restoreActiveState() is triggered in the
>> onPartitionAssigned
>> > > callback, which is after the rebalance is completed from the
>> > coordinator's
>> > > point of view, and hence is covered in the process timeout value in
>> this
>> > > new KIP.
>> > >
>> > > 2. That is a good question, and I think it is a general root cause we
>> saw
>> > > failures of directory locking reported by more than one use case
>> already.
>> > > Currently I believe the main reason that a second rebalance is
>> triggered
>> > > while the processors are still completing restoreActiveState() of the
>> > > previous rebalance is due to session timeout (default 30 seconds),
>> which
>> > > will be largely reduced with a larger processor timeout; however with
>> > > complex topologies we restoreActiveState() for all states may still be
>> > > taking long time with tens / hundreds of state stores, and other cases
>> > > that also can cause consumers to re-join the groups right after a
>> > previous
>> > > rebalance, for example 1) regex subscription where the topic metadata
>> has
>> > > changed, 2) consecutive consumer failures, or new consumers (i.e. new
>> > > KStream instances / threads) added.
>> > >
>> > > For such cases we can do a better job to "fail fast" if the consumer
>> > > detects another join is needed. I think in one of your local commit
>> you
>> > > are already doing sth similar, which we can merge back to trunk.
>> > >
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Sun, Jun 5, 2016 at 11:24 PM, Henry Cai <h...@pinterest.com.invalid
>> >
>> > > wrote:
>> > >
>> > >> I have a question on the KIP on long stall during
>> > >> ProcessorStateManager.restoreActiveState(), this can be a long stall
>> > when
>> > >> we need to rebuild the RocksDB state on a new node.
>> > >>
>> > >> 1. Is restoreActiveState() considered as post rebalance since this is
>> > >> invoked on application rebalance listener?
>> > >> 2. When the node A was spending long time rebuilding the state in
>> > >> restoreActiveState() from the previous rebalance, a new node (node B)
>> > send
>> > >> a new JoinGroup request to the co-ordinator, how long should the
>> > >> coordinator wait for node A to finish the restoreActiveState from the
>> > >> previous rebalance, the restoreActiveState can take more than 10
>> minutes
>> > >> for a big state.
>> > >>
>> > >>
>> > >> On Sun, Jun 5, 2016 at 10:46 PM, Becket Qin <becket....@gmail.com>
>> > wrote:
>> > >>
>> > >> > Hi Jason,
>> > >> >
>> > >> > Thanks for this very useful KIP.  In general I am with Guozhang on
>> the
>> > >> > purpose of of the three timeouts.
>> > >> > 1) session timeout for consumer liveness,
>> > >> > 2) process timeout (or maybe we should rename it to
>> > >> max.poll.interval.ms)
>> > >> > for application liveness,
>> > >> > 3) rebalance timeout for faster rebalance in some failure cases.
>> > >> >
>> > >> > It seems the current discussion is mainly about whether we need 3)
>> as
>> > a
>> > >> > separate timeout or not. The current KIP proposal is to combine 2)
>> and
>> > >> 3),
>> > >> > i.e. just use process timeout as rebalance timeout. That means we
>> need
>> > >> to
>> > >> > either increase rebalance timeout out to let it adapt to process
>> > >> timeout,
>> > >> > or the reverse. It would be helpful to understand the impact of
>> these
>> > >> two
>> > >> > cases. Here are my two cents.
>> > >> >
>> > >> > For users who are consuming data from Kafka, usually they either
>> care
>> > >> about
>> > >> > throughput or care about latency.
>> > >> >
>> > >> > If users care about the latency, they would probably care more
>> about
>> > >> > average latency instead of 99.99 percentile latency which can be
>> > >> affected
>> > >> > by many other more common reasons other than consumer failure.
>> Because
>> > >> all
>> > >> > the timeout we are discussing here only have impact on the 99.99
>> > >> percentile
>> > >> > latency, I don't think it would really make a difference for
>> latency
>> > >> > sensitive users.
>> > >> >
>> > >> > The majority of the use cases for Kafka Connect and Mirror Maker
>> are
>> > >> > throughput sensitive. Ewen raised a good example where Kafka
>> Connect
>> > >> needs
>> > >> > to process the previous data on rebalance therefore requires a
>> higher
>> > >> > rebalance timeout than process timeout. This is essentially the
>> same
>> > in
>> > >> > Mirror Maker, where each rebalance needs to flush all the messages
>> in
>> > >> the
>> > >> > accumulator in the producer. That could take some time depending on
>> > how
>> > >> > many messages are there. In this case, we may need to increase the
>> > >> process
>> > >> > timeout to make it the same as rebalance timeout. But this is
>> probably
>> > >> > fine. The downside of increasing process timeout is a longer
>> detection
>> > >> time
>> > >> > of a consumer failure.  Detecting a consumer failure a little later
>> > only
>> > >> > has limited impact because the rest of the consumers in the same
>> group
>> > >> are
>> > >> > still working fine. So the total throughput is unlikely to drop
>> > >> > significantly. As long as the rebalance is not taking longer it
>> should
>> > >> be
>> > >> > fine. The reason we care more about how fast rebalance can finish
>> is
>> > >> > because during rebalance no consumer in the group is consuming,
>> i.e.
>> > >> > throughput is zero. So we want to make the rebalance finish as
>> quickly
>> > >> as
>> > >> > possible.
>> > >> >
>> > >> > Compare with increasing process timeout to rebalance timeout, it
>> > seems a
>> > >> > more common case where user wants a longer process timeout, but
>> > smaller
>> > >> > rebalance timeout. I am more worried about this case where we have
>> to
>> > >> > shoehorn the rebalance timeout into process timeout. For users care
>> > >> about
>> > >> > throughput, that might cause the rebalance to take unnecessarily
>> > longer.
>> > >> > Admittedly this only has impact when a consumer had problem during
>> > >> > rebalance, but depending on how long the process timeout was set,
>> the
>> > >> > rebalance could potentially take forever like Guozhang mentioned.
>> > >> >
>> > >> > I agree with Guozhang that we can start with 1) and 2) and add 3)
>> > later
>> > >> if
>> > >> > needed. But adding rebalance timeout is more involved than just
>> > adding a
>> > >> > configuration. That also means the rebalance has to be done in the
>> > >> > background heartbeat thread. Hence we have to synchronize rebalance
>> > and
>> > >> > consumer.poll() like we did in old consumer. Otherwise user may
>> lose
>> > >> > messages if auto commit is enabled, or the manual commit might fail
>> > >> after a
>> > >> > consumer.poll() because the partitions might have been reassigned.
>> So
>> > >> > having a separate rebalance timeout also potentially means a big
>> > change
>> > >> to
>> > >> > the users as well.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > Jiangjie (Becket) Qin
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <
>> ja...@confluent.io>
>> > >> > wrote:
>> > >> >
>> > >> > > Hey Ewen,
>> > >> > >
>> > >> > > I confess your comments caught me off guard. It never occurred
>> to me
>> > >> that
>> > >> > > anyone would ask for a rebalance timeout so that it could be set
>> > >> _larger_
>> > >> > > than the process timeout. Even with buffered or batch
>> processing, I
>> > >> would
>> > >> > > usually expect flushing before a rebalance to take no more time
>> > than a
>> > >> > > periodic flush. Otherwise, I'd probably try to see if there was
>> some
>> > >> > > workload I could push into periodic flushes so that rebalances
>> could
>> > >> > > complete faster. But supposing this isn't possible or practical
>> in
>> > >> some
>> > >> > > cases, I'm wondering how limiting it would be in practice to have
>> > only
>> > >> > the
>> > >> > > one timeout in this case? I'm a little reluctant to add the
>> > additional
>> > >> > > timeout since I think most users would not have a strong need to
>> > keep
>> > >> a
>> > >> > > tight bound on normal processing time. (I'm also reminded that
>> Jay
>> > >> > > mentioned he might have to dock everyone's pay 5% for each new
>> > >> timeout we
>> > >> > > introduce ;-)
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Jason
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <
>> wangg...@gmail.com>
>> > >> > wrote:
>> > >> > >
>> > >> > > > Hi Ewen,
>> > >> > > >
>> > >> > > > I think you are right, the rebalance process could potentially
>> > >> involve
>> > >> > > all
>> > >> > > > the delayed compute / IO. More specifically, this is what I
>> think
>> > of
>> > >> > the
>> > >> > > > rebalance process:
>> > >> > > >
>> > >> > > > 1. Coordinator decides to rebalance, start ticking based on
>> > >> rebalance
>> > >> > > time
>> > >> > > > out.
>> > >> > > > 2. Consumer realize rebalance needed when calling poll();
>> trigger
>> > >> > > > onPartitionRevoked().
>> > >> > > > 3. Consumer sends JoinGroupRequest;
>> > >> > > > 4. Coordinator send JoinGroupResponse; start ticking on the
>> > leader.
>> > >> > > > 5. Leader compute and send SyncGroupRequest
>> > >> > > > 6. Coordinator send SyncGroupResponse; start ticking on session
>> > >> > timeout.
>> > >> > > > 7. Consumer get new assignment; trigger onPartitionAssigned().
>> > >> > > >
>> > >> > > > In the above process: delayed compute / IO is usually done at
>> step
>> > >> 2);
>> > >> > > > workload initialization is usually done in step 7); and some
>> admin
>> > >> work
>> > >> > > > (like in Kafka Streams) are likely to be done in step 5). As in
>> > the
>> > >> > > current
>> > >> > > > KIP proposal the rebalance timeout on the coordinator start
>> > ticking
>> > >> on
>> > >> > 1)
>> > >> > > > on everyone in the group, and stop ticking on 3); it start
>> ticking
>> > >> on
>> > >> > > > leader again on step 4), and stop upon step 5). In this case
>> the
>> > >> > delayed
>> > >> > > > compute / IO contained in step 2) is covered by this rebalance
>> > >> timeout.
>> > >> > > >
>> > >> > > > That being said, I think for "worst case", the time of
>> processing
>> > a
>> > >> > > single
>> > >> > > > record would still be similar to rebalancing, since both of
>> which
>> > >> could
>> > >> > > > result in completing all delayed compute / IO so far. And since
>> > >> > > "processing
>> > >> > > > timeout" is used to cover the worst case, it should be still
>> OK?
>> > >> > > >
>> > >> > > >
>> > >> > > > Guozhang
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava <
>> > >> > e...@confluent.io
>> > >> > > >
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > > > Jason,
>> > >> > > > >
>> > >> > > > > I've been thinking about this more in terms of something like
>> > >> > Connect.
>> > >> > > I
>> > >> > > > > think the rebalance timeout may be a bit different from the
>> > >> process
>> > >> > > > > timeout, and even the process timeout is a bit of a misnomer.
>> > >> > > > >
>> > >> > > > > We sort of talk about the process timeout as if it can be an
>> > >> > indicator
>> > >> > > of
>> > >> > > > > maximum processing time for a record/batch. This makes sense
>> > for a
>> > >> > case
>> > >> > > > of
>> > >> > > > > a data-dependent load (i.e. you can only load some data from
>> > slow
>> > >> > > storage
>> > >> > > > > after seeing some data) where that load might be very large
>> > >> compared
>> > >> > to
>> > >> > > > > normal processing time. It also makes sense if you have auto
>> > >> commit
>> > >> > > > enabled
>> > >> > > > > because you need to be completely finished processing the
>> data
>> > >> before
>> > >> > > > > calling poll() again, so that time before you call another
>> > >> consumer
>> > >> > API
>> > >> > > > > actually reflects processing time.
>> > >> > > > >
>> > >> > > > > It might makes less sense in cases like streams (or any other
>> > app)
>> > >> > that
>> > >> > > > > batch writes to disk, or connectors that "process" a message
>> by
>> > >> > > enqueuing
>> > >> > > > > the data, but won't commit offsets until data is flushed,
>> > possibly
>> > >> > > during
>> > >> > > > > some other, much later iteration of processing. In this case
>> I
>> > >> think
>> > >> > > > > processing time and rebalance time could potentially differ
>> > >> > > > significantly.
>> > >> > > > > During normal processing, you can potentially pipeline quite
>> a
>> > >> bit,
>> > >> > > > > buffering up changes, flushing as needed, but then only
>> > committing
>> > >> > once
>> > >> > > > > flushing is complete. But rebalancing is different then --
>> you
>> > >> *must*
>> > >> > > > > finish flushing all the data or manually choose to discard
>> the
>> > >> data
>> > >> > > > > (presumably by doing something like watching for the process
>> > >> timeout
>> > >> > > you
>> > >> > > > > set and bailing early, only committing the offsets for data
>> > you've
>> > >> > > > > flushed). If you have lots of data built up, the cost for
>> > >> rebalancing
>> > >> > > > could
>> > >> > > > > be a *lot* higher than the maximum time you would otherwise
>> see
>> > >> > between
>> > >> > > > > calls to consumer APIs to indicate processing progress.
>> > >> > > > >
>> > >> > > > > The thing that makes these cases different is that processing
>> > >> isn't
>> > >> > > > > actually tied to calls to the consumer API. You can queue up
>> /
>> > >> > > pipeline /
>> > >> > > > > defer some of the work. (By the way, this is currently a
>> > >> limitation
>> > >> > of
>> > >> > > > sink
>> > >> > > > > connectors that I'm not thrilled about -- offset commit
>> > requires a
>> > >> > full
>> > >> > > > > flush, whereas some coordination with the sink connector to
>> not
>> > >> > > require a
>> > >> > > > > full flush except on rebalances would be much nicer, albeit
>> more
>> > >> > > > difficult
>> > >> > > > > for sink connectors to implement.)
>> > >> > > > >
>> > >> > > > > -Ewen
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <
>> > >> ja...@confluent.io>
>> > >> > > > > wrote:
>> > >> > > > >
>> > >> > > > > > Hey Guozhang,
>> > >> > > > > >
>> > >> > > > > > I'm actually not too concerned about the time spent in the
>> > >> > rebalance
>> > >> > > > > > callback specifically. Both it and regular processing time
>> in
>> > >> the
>> > >> > > poll
>> > >> > > > > loop
>> > >> > > > > > will delay the rebalance and keep joined consumers idle.
>> > >> However,
>> > >> > if
>> > >> > > we
>> > >> > > > > > expose the rebalance timeout, then it would give users the
>> > >> option
>> > >> > to
>> > >> > > > > > effective disable the process timeout while still keeping a
>> > >> maximum
>> > >> > > > bound
>> > >> > > > > > on the rebalance time. If the consumer cannot complete its
>> > >> > processing
>> > >> > > > > fast
>> > >> > > > > > enough and rejoin, then it would be evicted. This provides
>> > >> > something
>> > >> > > > like
>> > >> > > > > > (2) since the other consumers in the group would be able to
>> > >> > complete
>> > >> > > > the
>> > >> > > > > > rebalance and resume work while the evicted consumer would
>> > have
>> > >> to
>> > >> > > > > rollback
>> > >> > > > > > progress. This is not too different from rebalancing in the
>> > >> > > background
>> > >> > > > > > which also typically would cause commit failure and
>> rollback
>> > >> > (though
>> > >> > > at
>> > >> > > > > > least the consumer stays in the group).
>> > >> > > > > >
>> > >> > > > > > Now that I'm thinking about it more, I'm not sure this
>> would
>> > be
>> > >> a
>> > >> > > great
>> > >> > > > > > facility to depend on in practice. It might be OK if just
>> one
>> > or
>> > >> > two
>> > >> > > of
>> > >> > > > > the
>> > >> > > > > > consumers fall out of the group during the rebalance, but
>> if
>> > >> half
>> > >> > the
>> > >> > > > > group
>> > >> > > > > > is regularly getting evicted, it would be a problem. So
>> even
>> > if
>> > >> we
>> > >> > > > expose
>> > >> > > > > > the rebalance timeout, the user is still going to have to
>> set
>> > it
>> > >> > with
>> > >> > > > > some
>> > >> > > > > > idea in mind about how long processing should take.
>> > >> > > > > >
>> > >> > > > > > Thanks,
>> > >> > > > > > Jason
>> > >> > > > > >
>> > >> > > > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <
>> > >> wangg...@gmail.com>
>> > >> > > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hi Jason,
>> > >> > > > > > >
>> > >> > > > > > > With the current usage pattern of:
>> > >> > > > > > >
>> > >> > > > > > > while(..) {
>> > >> > > > > > >
>> > >> > > > > > >   consumer.poll(/* where rebalance happens */)
>> > >> > > > > > >
>> > >> > > > > > >   // process messages
>> > >> > > > > > > }
>> > >> > > > > > >
>> > >> > > > > > > ----------
>> > >> > > > > > >
>> > >> > > > > > > And since rebalance is till on the caller thread, not the
>> > >> > > background
>> > >> > > > > > > thread, if coordinator decides to rebalance while user
>> > thread
>> > >> is
>> > >> > > > still
>> > >> > > > > on
>> > >> > > > > > > processing messages, there is no options but we are
>> forced
>> > to
>> > >> go
>> > >> > > with
>> > >> > > > > 1)
>> > >> > > > > > > right? I think the your / Onur's point here, which I
>> agree,
>> > is
>> > >> > that
>> > >> > > > by
>> > >> > > > > > > reusing process timeout as rebalance timeout, if the
>> > rebalance
>> > >> > > > callback
>> > >> > > > > > > could take longer time than processing a batch, users
>> need
>> > to
>> > >> set
>> > >> > > the
>> > >> > > > > > > timeout value to the higher of the two, hence the
>> callback
>> > >> > latency,
>> > >> > > > > which
>> > >> > > > > > > will make detection of processing stallness less
>> effective,
>> > >> > right?
>> > >> > > > > > >
>> > >> > > > > > > As I mentioned  in my previous email, I feel that this
>> case
>> > of
>> > >> > > > > "callback
>> > >> > > > > > > function time taking loner than processing a batch" would
>> > not
>> > >> be
>> > >> > > > > frequent
>> > >> > > > > > > in practice, and the processing timeout would usually be
>> a
>> > >> good
>> > >> > > > higher
>> > >> > > > > > > bound on the callback function latency. If that is true,
>> I'd
>> > >> > > suggest
>> > >> > > > we
>> > >> > > > > > > keep the current proposal and not add a third timeout
>> config
>> > >> for
>> > >> > > > > covering
>> > >> > > > > > > this case.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > Guozhang
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <
>> > >> > > ja...@confluent.io
>> > >> > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > > > Hey Guozhang,
>> > >> > > > > > > >
>> > >> > > > > > > > I think the problem is that users may not want to
>> > sacrifice
>> > >> > > > rebalance
>> > >> > > > > > > > latency because of uncertainty around processing time.
>> As
>> > >> soon
>> > >> > > as a
>> > >> > > > > > > > rebalance begins, there are basically two choices:
>> > >> > > > > > > >
>> > >> > > > > > > > 1. Block the rebalance until all consumers have
>> finished
>> > >> their
>> > >> > > > > current
>> > >> > > > > > > > processing.
>> > >> > > > > > > > 2. Let all consumers rebalance and "rollback" any
>> > processing
>> > >> > that
>> > >> > > > > could
>> > >> > > > > > > not
>> > >> > > > > > > > be committed before the rebalance completes.
>> > >> > > > > > > >
>> > >> > > > > > > > If you choose option (1), then you have an incentive to
>> > >> keep a
>> > >> > > > > > relatively
>> > >> > > > > > > > tight bound on process.timeout.ms in order to reduce
>> the
>> > >> > > > worst-case
>> > >> > > > > > idle
>> > >> > > > > > > > time during a rebalance. But if you fail to set it high
>> > >> enough,
>> > >> > > > then
>> > >> > > > > > > you'll
>> > >> > > > > > > > get spurious rebalances during normal processing. I
>> think
>> > >> Onur
>> > >> > is
>> > >> > > > > > saying
>> > >> > > > > > > > that this still sort of sucks for users. On the other
>> > hand,
>> > >> if
>> > >> > > (2)
>> > >> > > > is
>> > >> > > > > > > > acceptable, then users will have more freedom to err on
>> > the
>> > >> > high
>> > >> > > > side
>> > >> > > > > > > when
>> > >> > > > > > > > setting process.timeout.ms, or even disable it
>> entirely.
>> > >> They
>> > >> > > will
>> > >> > > > > > have
>> > >> > > > > > > to
>> > >> > > > > > > > deal with rolling back any progress which cannot be
>> > >> committed
>> > >> > > after
>> > >> > > > > the
>> > >> > > > > > > > rebalance completes, but maybe this is less of a
>> problem
>> > for
>> > >> > some
>> > >> > > > > > users?
>> > >> > > > > > > >
>> > >> > > > > > > > Thanks,
>> > >> > > > > > > > Jason
>> > >> > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <
>> > >> > > wangg...@gmail.com
>> > >> > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > > >
>> > >> > > > > > > > > Hi Onur, Jason:
>> > >> > > > > > > > >
>> > >> > > > > > > > > Here are some thoughts about reusing process timeout
>> as
>> > >> > > > server-side
>> > >> > > > > > > > > rebalance timeout: First of all, my understanding is
>> > that
>> > >> > > > > > > > >
>> > >> > > > > > > > > 1) session timeout is for detecting consumer crash /
>> > hard
>> > >> > > > failures
>> > >> > > > > > (in
>> > >> > > > > > > > this
>> > >> > > > > > > > > case the heartbeat thread will be dead as well, hence
>> > >> > > coordinator
>> > >> > > > > > > > realized
>> > >> > > > > > > > > within session timeout value).
>> > >> > > > > > > > >
>> > >> > > > > > > > > 2) process timeout is for checking liveness of the
>> user
>> > >> > thread
>> > >> > > > that
>> > >> > > > > > > calls
>> > >> > > > > > > > > the consumer as well as does the processing: when no
>> > >> consumer
>> > >> > > > calls
>> > >> > > > > > are
>> > >> > > > > > > > > made within the process timeout, heartbeat thread
>> stop
>> > >> > working
>> > >> > > > and
>> > >> > > > > > > hence
>> > >> > > > > > > > it
>> > >> > > > > > > > > will be detected by coordinator.
>> > >> > > > > > > > >
>> > >> > > > > > > > > 3) a potential server-side rebalance timeout would be
>> > >> used to
>> > >> > > > > detect
>> > >> > > > > > > > > consumer liveness during the rebalance period, in
>> which
>> > >> the
>> > >> > > user
>> > >> > > > > > thread
>> > >> > > > > > > > is
>> > >> > > > > > > > > tied with the "poll" call and also the callback
>> > function,
>> > >> to
>> > >> > > > > prevent
>> > >> > > > > > a
>> > >> > > > > > > > slow
>> > >> > > > > > > > > / stalled consumer in their rebalance callback to
>> cause
>> > >> the
>> > >> > > > > rebalance
>> > >> > > > > > > > > taking forever.
>> > >> > > > > > > > >
>> > >> > > > > > > > > I think we generally have two cases in practice
>> > regarding
>> > >> 3)
>> > >> > > > above:
>> > >> > > > > > > user
>> > >> > > > > > > > > either does almost nothing and hence should never be
>> > >> stalled
>> > >> > > > > (unless
>> > >> > > > > > > > there
>> > >> > > > > > > > > is a long GC), or they do various external IOs for
>> > >> > maintaining
>> > >> > > > > their
>> > >> > > > > > > own
>> > >> > > > > > > > > states, for example, which could be taking long or
>> even
>> > >> cause
>> > >> > > the
>> > >> > > > > > > thread
>> > >> > > > > > > > to
>> > >> > > > > > > > > stall. We do not need to worry too much about the
>> former
>> > >> > case,
>> > >> > > > and
>> > >> > > > > as
>> > >> > > > > > > for
>> > >> > > > > > > > > latter case if the process timeout value should
>> usually
>> > >> be a
>> > >> > > good
>> > >> > > > > > > higher
>> > >> > > > > > > > > bound on the rebalance latency.
>> > >> > > > > > > > >
>> > >> > > > > > > > > That being said, if we observe that there is indeed a
>> > >> common
>> > >> > > > usage
>> > >> > > > > > > where
>> > >> > > > > > > > 2)
>> > >> > > > > > > > > and 3) would require very different timeout values
>> which
>> > >> > > > overwhelms
>> > >> > > > > > the
>> > >> > > > > > > > > complexity of three timeout values, we can consider
>> > >> adding a
>> > >> > > > third
>> > >> > > > > > one
>> > >> > > > > > > > > then: it is easier to add more configs later.
>> > >> > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > > > What do you think?
>> > >> > > > > > > > >
>> > >> > > > > > > > > Guozhang
>> > >> > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <
>> > >> > > > > ja...@confluent.io
>> > >> > > > > > >
>> > >> > > > > > > > > wrote:
>> > >> > > > > > > > >
>> > >> > > > > > > > > > Hey Onur,
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Thanks for the detailed response. I think the
>> problem
>> > of
>> > >> > > > > > controlling
>> > >> > > > > > > > > > rebalance times is the main (known) gap in the
>> > proposal
>> > >> as
>> > >> > it
>> > >> > > > > > stands.
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > This burden goes away if you loosen the liveness
>> > >> property
>> > >> > by
>> > >> > > > > > having a
>> > >> > > > > > > > > > > required rebalance time and optional processing
>> time
>> > >> > where
>> > >> > > > > > > rebalance
>> > >> > > > > > > > > > > happens in the background thread as stated in the
>> > KIP.
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Just to clarify, the current KIP only allows
>> > rebalances
>> > >> to
>> > >> > > > > complete
>> > >> > > > > > > in
>> > >> > > > > > > > > the
>> > >> > > > > > > > > > foreground. When I suggested above in reply to
>> Grant
>> > was
>> > >> > that
>> > >> > > > we
>> > >> > > > > > > could
>> > >> > > > > > > > > add
>> > >> > > > > > > > > > a separate rebalance timeout setting, the behavior
>> I
>> > >> had in
>> > >> > > > mind
>> > >> > > > > > was
>> > >> > > > > > > to
>> > >> > > > > > > > > let
>> > >> > > > > > > > > > the consumer fall out of the group if the timeout
>> is
>> > >> > reached
>> > >> > > > > while
>> > >> > > > > > > the
>> > >> > > > > > > > > > consumer is still processing. I was specifically
>> > trying
>> > >> to
>> > >> > > > avoid
>> > >> > > > > > > moving
>> > >> > > > > > > > > the
>> > >> > > > > > > > > > rebalance to the background thread since this
>> > >> significantly
>> > >> > > > > > increases
>> > >> > > > > > > > the
>> > >> > > > > > > > > > complexity of the implementation. We'd also have to
>> > >> think
>> > >> > > about
>> > >> > > > > > > > > > compatibility a bit more. For example, what are the
>> > >> > > > implications
>> > >> > > > > of
>> > >> > > > > > > > > having
>> > >> > > > > > > > > > the rebalance listener execute in a separate
>> thread?
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Putting that issue aside, I think we need to
>> convince
>> > >> > > ourselves
>> > >> > > > > > that
>> > >> > > > > > > a
>> > >> > > > > > > > > > separate rebalance timeout is really necessary
>> since
>> > >> every
>> > >> > > new
>> > >> > > > > > > timeout
>> > >> > > > > > > > > adds
>> > >> > > > > > > > > > some conceptual noise which all users will see. My
>> > >> thought
>> > >> > in
>> > >> > > > > this
>> > >> > > > > > > KIP
>> > >> > > > > > > > > was
>> > >> > > > > > > > > > that users who didn't want the burden of tuning the
>> > >> process
>> > >> > > > > timeout
>> > >> > > > > > > > could
>> > >> > > > > > > > > > use a relatively large value without a major impact
>> > >> because
>> > >> > > > group
>> > >> > > > > > > > > > rebalances themselves will typically be infrequent.
>> > The
>> > >> > main
>> > >> > > > > > concern
>> > >> > > > > > > is
>> > >> > > > > > > > > for
>> > >> > > > > > > > > > users who have highly variant processing times and
>> > want
>> > >> to
>> > >> > > > > ensure a
>> > >> > > > > > > > tight
>> > >> > > > > > > > > > bound on rebalance times (even if it means having
>> to
>> > >> > discard
>> > >> > > > some
>> > >> > > > > > > > > > processing that cannot be completed before the
>> > rebalance
>> > >> > > > > finishes).
>> > >> > > > > > > > These
>> > >> > > > > > > > > > users will be left trying to tune
>> process.timeout.ms
>> > >> and
>> > >> > > > > > > > > max.poll.records,
>> > >> > > > > > > > > > which is basically the same position they are
>> > currently
>> > >> in.
>> > >> > > The
>> > >> > > > > > > problem
>> > >> > > > > > > > > is
>> > >> > > > > > > > > > I don't know how common this case is, so I'm not
>> sure
>> > >> how
>> > >> > it
>> > >> > > > > weighs
>> > >> > > > > > > > > against
>> > >> > > > > > > > > > the cost of having an additional timeout that
>> needs to
>> > >> be
>> > >> > > > > > explained.
>> > >> > > > > > > We
>> > >> > > > > > > > > can
>> > >> > > > > > > > > > always add the rebalance timeout later, but if it
>> will
>> > >> be
>> > >> > > tough
>> > >> > > > > to
>> > >> > > > > > > > remove
>> > >> > > > > > > > > > once it's there. All the same, I'm not that keen on
>> > >> another
>> > >> > > > > > iteration
>> > >> > > > > > > > of
>> > >> > > > > > > > > > this problem, so if we believe this use case is
>> common
>> > >> > > enough,
>> > >> > > > > then
>> > >> > > > > > > > maybe
>> > >> > > > > > > > > > we should add it now.
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Thanks,
>> > >> > > > > > > > > > Jason
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <
>> > >> > > > > > > > > > onurkaraman.apa...@gmail.com>
>> > >> > > > > > > > > > wrote:
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > > Thanks for the KIP writeup, Jason.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Before anything else, I just wanted to point out
>> > that
>> > >> > it's
>> > >> > > > > worth
>> > >> > > > > > > > > > mentioning
>> > >> > > > > > > > > > > the "heartbeat.interval.ms" consumer config in
>> the
>> > >> KIP
>> > >> > for
>> > >> > > > > > > > > completeness.
>> > >> > > > > > > > > > > Today this config only starts to kick in if poll
>> is
>> > >> > called
>> > >> > > > > > > frequently
>> > >> > > > > > > > > > > enough. A separate heartbeat thread should make
>> this
>> > >> > config
>> > >> > > > > > behave
>> > >> > > > > > > > more
>> > >> > > > > > > > > > > like what people would expect: a separate thread
>> > >> sending
>> > >> > > > > > heartbeats
>> > >> > > > > > > > at
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > configured interval.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > With this KIP, the relevant configs become:
>> > >> > > > > > > > > > > "max.poll.records" - already exists
>> > >> > > > > > > > > > > "session.timeout.ms" - already exists
>> > >> > > > > > > > > > > "heartbeat.interval.ms" - already exists
>> > >> > > > > > > > > > > "process.timeout.ms" - new
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > After reading the KIP several times, I think it
>> > would
>> > >> be
>> > >> > > > > helpful
>> > >> > > > > > to
>> > >> > > > > > > > be
>> > >> > > > > > > > > > more
>> > >> > > > > > > > > > > explicit in the desired outcome. Is it trying to
>> > make
>> > >> > > faster
>> > >> > > > > > > > > > > best/average/worst case rebalance times? Is it
>> > trying
>> > >> to
>> > >> > > make
>> > >> > > > > the
>> > >> > > > > > > > > clients
>> > >> > > > > > > > > > > need less configuration tuning?
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Also it seems that brokers probably still want to
>> > >> enforce
>> > >> > > > > minimum
>> > >> > > > > > > and
>> > >> > > > > > > > > > > maximum rebalance timeouts just as with the
>> minimum
>> > >> and
>> > >> > > > maximum
>> > >> > > > > > > > session
>> > >> > > > > > > > > > > timeouts so DelayedJoins don't stay in purgatory
>> > >> > > > indefinitely.
>> > >> > > > > So
>> > >> > > > > > > > we'd
>> > >> > > > > > > > > > add
>> > >> > > > > > > > > > > new "group.min.rebalance.timeout.ms" and "
>> > >> > > > > > > > > group.max.rebalance.timeout.ms
>> > >> > > > > > > > > > "
>> > >> > > > > > > > > > > broker configs which again might need to be
>> brought
>> > >> up in
>> > >> > > the
>> > >> > > > > > KIP.
>> > >> > > > > > > > > Let's
>> > >> > > > > > > > > > > say we add these bounds. A side-effect of having
>> > >> > > broker-side
>> > >> > > > > > bounds
>> > >> > > > > > > > on
>> > >> > > > > > > > > > > rebalance timeouts in combination with Java
>> clients
>> > >> that
>> > >> > > > makes
>> > >> > > > > > > > process
>> > >> > > > > > > > > > > timeouts the same as rebalance timeouts is that
>> the
>> > >> > broker
>> > >> > > > > > > > effectively
>> > >> > > > > > > > > > > dictates the max processing time allowed between
>> > poll
>> > >> > > calls.
>> > >> > > > > This
>> > >> > > > > > > > > gotcha
>> > >> > > > > > > > > > > exists right now with today's broker-side bounds
>> on
>> > >> > session
>> > >> > > > > > > timeouts.
>> > >> > > > > > > > > So
>> > >> > > > > > > > > > > I'm not really convinced that the proposal gets
>> rid
>> > of
>> > >> > this
>> > >> > > > > > > > > complication
>> > >> > > > > > > > > > > mentioned in the KIP.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > I think the main question to ask is: does the KIP
>> > >> > actually
>> > >> > > > > make a
>> > >> > > > > > > > > > > difference?
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > It looks like this KIP improves rebalance times
>> > >> > > specifically
>> > >> > > > > when
>> > >> > > > > > > the
>> > >> > > > > > > > > > > client currently has processing times large
>> enough
>> > to
>> > >> > force
>> > >> > > > > > larger
>> > >> > > > > > > > > > session
>> > >> > > > > > > > > > > timeouts and heartbeat intervals to not be
>> honored.
>> > >> > > > Separating
>> > >> > > > > > > > session
>> > >> > > > > > > > > > > timeouts from processing time means clients can
>> keep
>> > >> > their
>> > >> > > "
>> > >> > > > > > > > > > > session.timeout.ms" low so the coordinator can
>> > >> quickly
>> > >> > > > detect
>> > >> > > > > > > > process
>> > >> > > > > > > > > > > failure, and honoring a low "
>> heartbeat.interval.ms"
>> > >> on
>> > >> > the
>> > >> > > > > > > separate
>> > >> > > > > > > > > > > heartbeat thread means clients will be quickly
>> > >> notified
>> > >> > of
>> > >> > > > > group
>> > >> > > > > > > > > > membership
>> > >> > > > > > > > > > > and subscription changes - all without placing
>> > >> difficult
>> > >> > > > > > > expectations
>> > >> > > > > > > > > on
>> > >> > > > > > > > > > > processing time. But even so, rebalancing through
>> > the
>> > >> > > calling
>> > >> > > > > > > thread
>> > >> > > > > > > > > > means
>> > >> > > > > > > > > > > the slowest processing client in the group will
>> > still
>> > >> be
>> > >> > > the
>> > >> > > > > rate
>> > >> > > > > > > > > > limiting
>> > >> > > > > > > > > > > step when looking at rebalance times.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > From a usability perspective, the burden still
>> seems
>> > >> like
>> > >> > > it
>> > >> > > > > will
>> > >> > > > > > > be
>> > >> > > > > > > > > > tuning
>> > >> > > > > > > > > > > the processing time to keep the "progress
>> liveness"
>> > >> happy
>> > >> > > > > during
>> > >> > > > > > > > > > rebalances
>> > >> > > > > > > > > > > while still having reasonable upper bounds on
>> > >> rebalance
>> > >> > > > times.
>> > >> > > > > It
>> > >> > > > > > > > still
>> > >> > > > > > > > > > > looks like users have to do almost the exact same
>> > >> tricks
>> > >> > as
>> > >> > > > > today
>> > >> > > > > > > > when
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > group membership changes due slow processing
>> times
>> > >> even
>> > >> > > > though
>> > >> > > > > > all
>> > >> > > > > > > > the
>> > >> > > > > > > > > > > consumers are alive and the topics haven't
>> change:
>> > >> > > > > > > > > > > 1. Increase the rebalance timeout to give more
>> time
>> > >> for
>> > >> > > > record
>> > >> > > > > > > > > processing
>> > >> > > > > > > > > > > (the difference compared to today is that we bump
>> > the
>> > >> > > > rebalance
>> > >> > > > > > > > timeout
>> > >> > > > > > > > > > > instead of session timeout).
>> > >> > > > > > > > > > > 2. Reduce the number of records handled on each
>> > >> iteration
>> > >> > > > with
>> > >> > > > > > > > > > > max.poll.records.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > This burden goes away if you loosen the liveness
>> > >> property
>> > >> > > by
>> > >> > > > > > > having a
>> > >> > > > > > > > > > > required rebalance time and optional processing
>> time
>> > >> > where
>> > >> > > > > > > rebalance
>> > >> > > > > > > > > > > happens in the background thread as stated in the
>> > KIP.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason
>> Gustafson <
>> > >> > > > > > > > ja...@confluent.io>
>> > >> > > > > > > > > > > wrote:
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > > Hey Grant,
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > Thanks for the feedback. I'm definitely open to
>> > >> > including
>> > >> > > > > > > > heartbeat()
>> > >> > > > > > > > > > in
>> > >> > > > > > > > > > > > this KIP. One thing we should be clear about is
>> > what
>> > >> > the
>> > >> > > > > > behavior
>> > >> > > > > > > > of
>> > >> > > > > > > > > > > > heartbeat() should be when the group begins
>> > >> > rebalancing.
>> > >> > > I
>> > >> > > > > > think
>> > >> > > > > > > > > there
>> > >> > > > > > > > > > > are
>> > >> > > > > > > > > > > > basically two options:
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > 1. heartbeat() simply keeps heartbeating even
>> if
>> > the
>> > >> > > group
>> > >> > > > > has
>> > >> > > > > > > > > started
>> > >> > > > > > > > > > > > rebalancing.
>> > >> > > > > > > > > > > > 2. heartbeat() completes the rebalance itself.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > With the first option, when processing takes
>> > longer
>> > >> > than
>> > >> > > > the
>> > >> > > > > > > > > rebalance
>> > >> > > > > > > > > > > > timeout, the member will fall out of the group
>> > which
>> > >> > will
>> > >> > > > > cause
>> > >> > > > > > > an
>> > >> > > > > > > > > > offset
>> > >> > > > > > > > > > > > commit failure when it finally finishes.
>> However,
>> > if
>> > >> > > > > processing
>> > >> > > > > > > > > > finishes
>> > >> > > > > > > > > > > > before the rebalance completes, then offsets
>> can
>> > >> still
>> > >> > be
>> > >> > > > > > > > committed.
>> > >> > > > > > > > > On
>> > >> > > > > > > > > > > the
>> > >> > > > > > > > > > > > other hand, if heartbeat() completes the
>> rebalance
>> > >> > > itself,
>> > >> > > > > then
>> > >> > > > > > > > > you'll
>> > >> > > > > > > > > > > > definitely see the offset commit failure for
>> any
>> > >> > records
>> > >> > > > > being
>> > >> > > > > > > > > > processed.
>> > >> > > > > > > > > > > > So the first option is sort of biased toward
>> > >> processing
>> > >> > > > > > > completion
>> > >> > > > > > > > > > while
>> > >> > > > > > > > > > > > the latter is biased toward rebalance
>> completion.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > I'm definitely not a fan of second option
>> since it
>> > >> > takes
>> > >> > > > away
>> > >> > > > > > the
>> > >> > > > > > > > > > choice
>> > >> > > > > > > > > > > to
>> > >> > > > > > > > > > > > finish processing before rejoining. However, I
>> do
>> > >> see
>> > >> > > some
>> > >> > > > > > > benefit
>> > >> > > > > > > > in
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > > first option if the user wants to keep
>> rebalance
>> > >> time
>> > >> > low
>> > >> > > > and
>> > >> > > > > > > > doesn't
>> > >> > > > > > > > > > > mind
>> > >> > > > > > > > > > > > being kicked out of the group if processing
>> takes
>> > >> > longer
>> > >> > > > > > during a
>> > >> > > > > > > > > > > > rebalance. This may be a reasonable tradeoff
>> since
>> > >> > > consumer
>> > >> > > > > > > groups
>> > >> > > > > > > > > are
>> > >> > > > > > > > > > > > presumed to be stable most of the time. A
>> better
>> > >> option
>> > >> > > in
>> > >> > > > > that
>> > >> > > > > > > > case
>> > >> > > > > > > > > > > might
>> > >> > > > > > > > > > > > be to expose the rebalance timeout to the user
>> > >> directly
>> > >> > > > since
>> > >> > > > > > it
>> > >> > > > > > > > > would
>> > >> > > > > > > > > > > > allow the user to use an essentially unbounded
>> > >> > > > > > > process.timeout.ms
>> > >> > > > > > > > > for
>> > >> > > > > > > > > > > > highly variant processing while still keeping
>> > >> rebalance
>> > >> > > > time
>> > >> > > > > > > > limited.
>> > >> > > > > > > > > > Of
>> > >> > > > > > > > > > > > course, it would be another timeout for the
>> user
>> > to
>> > >> > > > > > understand...
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > Thanks,
>> > >> > > > > > > > > > > > Jason
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <
>> > >> > > > > > > ghe...@cloudera.com>
>> > >> > > > > > > > > > > wrote:
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > > Hi Jason,
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > Thanks for writing up a proposal (and a
>> thorough
>> > >> > one)!
>> > >> > > > This
>> > >> > > > > > is
>> > >> > > > > > > > > > > something
>> > >> > > > > > > > > > > > > that I had been thinking about this week too
>> as
>> > I
>> > >> > have
>> > >> > > > run
>> > >> > > > > > into
>> > >> > > > > > > > it
>> > >> > > > > > > > > > more
>> > >> > > > > > > > > > > > > than a handful of times now.
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > I like the idea of having a larger processing
>> > >> > timeout,
>> > >> > > > that
>> > >> > > > > > > > timeout
>> > >> > > > > > > > > > in
>> > >> > > > > > > > > > > > > unison with max.poll.records should in many
>> > cases
>> > >> > > > provide a
>> > >> > > > > > > > > > reasonable
>> > >> > > > > > > > > > > > > assurance that the consumer will stay alive.
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > In rejected alternatives "Add a separate API
>> the
>> > >> user
>> > >> > > can
>> > >> > > > > > call
>> > >> > > > > > > to
>> > >> > > > > > > > > > > > indicate
>> > >> > > > > > > > > > > > > liveness" is listed. I think a heartbeat api
>> > >> could be
>> > >> > > > added
>> > >> > > > > > > along
>> > >> > > > > > > > > > with
>> > >> > > > > > > > > > > > > these new timeout configurations and used for
>> > >> > > "advanced"
>> > >> > > > > use
>> > >> > > > > > > > cases
>> > >> > > > > > > > > > > where
>> > >> > > > > > > > > > > > > the processing time could be highly variant
>> and
>> > >> less
>> > >> > > > > > > > predictable. I
>> > >> > > > > > > > > > > > think a
>> > >> > > > > > > > > > > > > place where we might use the heartbeat api in
>> > >> Kafka
>> > >> > is
>> > >> > > > > > > > MirrorMaker.
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > Today, I have seen people trying to find
>> ways to
>> > >> > > leverage
>> > >> > > > > the
>> > >> > > > > > > > > > existing
>> > >> > > > > > > > > > > > api
>> > >> > > > > > > > > > > > > to "force" heartbeats by:
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > 1. Calling poll to get the batch of records
>> to
>> > >> > process
>> > >> > > > > > > > > > > > > 2. Call pause on all partitions
>> > >> > > > > > > > > > > > > 3. Process the record batch
>> > >> > > > > > > > > > > > > 3a. While processing periodically call poll
>> > >> (which is
>> > >> > > > > > > essentially
>> > >> > > > > > > > > > just
>> > >> > > > > > > > > > > > > heartbeat since it returns no records and is
>> > >> paused)
>> > >> > > > > > > > > > > > > 4. Commit offsets and un-pause
>> > >> > > > > > > > > > > > > 5. Repeat from 1
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > Thanks,
>> > >> > > > > > > > > > > > > Grant
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason
>> > Gustafson <
>> > >> > > > > > > > > ja...@confluent.io
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > > > wrote:
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > > Hi All,
>> > >> > > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > > One of the persistent problems we see with
>> the
>> > >> new
>> > >> > > > > consumer
>> > >> > > > > > > is
>> > >> > > > > > > > > the
>> > >> > > > > > > > > > > use
>> > >> > > > > > > > > > > > of
>> > >> > > > > > > > > > > > > > the session timeout in order to ensure
>> > progress.
>> > >> > > > Whenever
>> > >> > > > > > > there
>> > >> > > > > > > > > is
>> > >> > > > > > > > > > a
>> > >> > > > > > > > > > > > > delay
>> > >> > > > > > > > > > > > > > in message processing which exceeds the
>> > session
>> > >> > > > timeout,
>> > >> > > > > no
>> > >> > > > > > > > > > > heartbeats
>> > >> > > > > > > > > > > > > can
>> > >> > > > > > > > > > > > > > be sent and the consumer is removed from
>> the
>> > >> group.
>> > >> > > We
>> > >> > > > > seem
>> > >> > > > > > > to
>> > >> > > > > > > > > hit
>> > >> > > > > > > > > > > this
>> > >> > > > > > > > > > > > > > problem everywhere the consumer is used
>> > >> (including
>> > >> > > > Kafka
>> > >> > > > > > > > Connect
>> > >> > > > > > > > > > and
>> > >> > > > > > > > > > > > > Kafka
>> > >> > > > > > > > > > > > > > Streams) and we don't always have a great
>> > >> solution.
>> > >> > > > I've
>> > >> > > > > > > > written
>> > >> > > > > > > > > a
>> > >> > > > > > > > > > > KIP
>> > >> > > > > > > > > > > > to
>> > >> > > > > > > > > > > > > > address this problem here:
>> > >> > > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
>> > >> > > > > > > > > > > > > > .
>> > >> > > > > > > > > > > > > > Have a look and let me know what you think.
>> > >> > > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > > Thanks,
>> > >> > > > > > > > > > > > > > Jason
>> > >> > > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > > > --
>> > >> > > > > > > > > > > > > Grant Henke
>> > >> > > > > > > > > > > > > Software Engineer | Cloudera
>> > >> > > > > > > > > > > > > gr...@cloudera.com | twitter.com/gchenke |
>> > >> > > > > > > > > > linkedin.com/in/granthenke
>> > >> > > > > > > > > > > > >
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > > > --
>> > >> > > > > > > > > -- Guozhang
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > --
>> > >> > > > > > > -- Guozhang
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > --
>> > >> > > > > Thanks,
>> > >> > > > > Ewen
>> > >> > > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > > --
>> > >> > > > -- Guozhang
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>

Reply via email to