Hey Grant, Thanks for the feedback. I'm definitely open to including heartbeat() in this KIP. One thing we should be clear about is what the behavior of heartbeat() should be when the group begins rebalancing. I think there are basically two options:
1. heartbeat() simply keeps heartbeating even if the group has started rebalancing. 2. heartbeat() completes the rebalance itself. With the first option, when processing takes longer than the rebalance timeout, the member will fall out of the group which will cause an offset commit failure when it finally finishes. However, if processing finishes before the rebalance completes, then offsets can still be committed. On the other hand, if heartbeat() completes the rebalance itself, then you'll definitely see the offset commit failure for any records being processed. So the first option is sort of biased toward processing completion while the latter is biased toward rebalance completion. I'm definitely not a fan of second option since it takes away the choice to finish processing before rejoining. However, I do see some benefit in the first option if the user wants to keep rebalance time low and doesn't mind being kicked out of the group if processing takes longer during a rebalance. This may be a reasonable tradeoff since consumer groups are presumed to be stable most of the time. A better option in that case might be to expose the rebalance timeout to the user directly since it would allow the user to use an essentially unbounded process.timeout.ms for highly variant processing while still keeping rebalance time limited. Of course, it would be another timeout for the user to understand... Thanks, Jason On Thu, May 26, 2016 at 8:19 AM, Grant Henke <ghe...@cloudera.com> wrote: > Hi Jason, > > Thanks for writing up a proposal (and a thorough one)! This is something > that I had been thinking about this week too as I have run into it more > than a handful of times now. > > I like the idea of having a larger processing timeout, that timeout in > unison with max.poll.records should in many cases provide a reasonable > assurance that the consumer will stay alive. > > In rejected alternatives "Add a separate API the user can call to indicate > liveness" is listed. I think a heartbeat api could be added along with > these new timeout configurations and used for "advanced" use cases where > the processing time could be highly variant and less predictable. I think a > place where we might use the heartbeat api in Kafka is MirrorMaker. > > Today, I have seen people trying to find ways to leverage the existing api > to "force" heartbeats by: > > 1. Calling poll to get the batch of records to process > 2. Call pause on all partitions > 3. Process the record batch > 3a. While processing periodically call poll (which is essentially just > heartbeat since it returns no records and is paused) > 4. Commit offsets and un-pause > 5. Repeat from 1 > > Thanks, > Grant > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi All, > > > > One of the persistent problems we see with the new consumer is the use of > > the session timeout in order to ensure progress. Whenever there is a > delay > > in message processing which exceeds the session timeout, no heartbeats > can > > be sent and the consumer is removed from the group. We seem to hit this > > problem everywhere the consumer is used (including Kafka Connect and > Kafka > > Streams) and we don't always have a great solution. I've written a KIP to > > address this problem here: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > . > > Have a look and let me know what you think. > > > > Thanks, > > Jason > > > > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >