To pile on a little bit, the API is designed to ensure consumer liveness so that partitions cannot be held indefinitely by a defunct process. Since heartbeating and message processing are done in the same thread, the consumer needs to demonstrate "progress" by calling poll() often enough not to get kicked out. What "often enough" means is dictated by the session timeout, which is 30s by default. If you fail to call poll() before the session timer expires, then the broker will assume that the member is dead and begin a rebalance. If you need more time to handle messages, increase session.timeout.ms in your configuration. The only downside to a higher timeout in general is that it will take longer to detect other kinds of failures (such as process crashes or network partitions).
This was the initial design, but it hasn't worked out quite as well as we would have liked, at least not in all situations. The first problem in 0.9 is that you don't have a direct way to control the amount of data that can be returned in a call to poll(), which makes it difficult to estimate the session timeout. You can set max.partition.fetch.bytes and, based on an estimate for the total number of partitions that you need to read, try to come up with a guess, but this is kind of hard in practice. So in 0.10 we've introduced a new setting max.poll.records, which lets you set an explicit bound on the number of messages that need to be handled on each poll iteration. The idea is hopefully that you can set this to a reasonably low value so that you're never risking a session timeout. It's also worthwhile understanding a little bit about how the rebalance mechanism works. After a consumer group is created, each consumer begins sending heartbeat messages to a special broker known as the coordinator. When a new consumer joins the group (or when the session timeout of an existing member expires), the other members find out about it through the error code in the heartbeat response. The group coordination protocol basically implements a synchronization barrier. When a rebalance begins, all members of the group have to join the barrier for it to complete. So if you want to reduce the impact from rebalancing, then you need to ensure that all members can join the barrier as soon as possible after it begins. For this, we expose heartbeat.interval.ms, but note that we can't actually send heartbeats any faster than the poll() interval itself because everything is done from the same thread. So if you want to always have fast rebalances, then the target for setting the processing bound should be the heartbeat interval instead of the session timeout. We've made some other small improvements to make unexpected rebalancing less of a problem in practice. For example, we modified the protocol behavior to allow offset commits to serve as effective heartbeats, which wasn't the case in 0.9. However, we're still encountering situations where there's really no clear way to estimate the session timeout other than somewhat exhaustive testing. Even max.poll.records doesn't help when the impact of a single message can vary disproportionately (as is sometimes the case in Kafka Streams which uses the consumer internally). You could set a ridiculously large session timeout in these cases, but that guarantees also a long time to recover from hard failures. I think this basically means that these use cases need a separate notion of liveness, which they have a bit more control over. For example, we can expose a method in the consumer which applications can call from any thread to know that they're still around. I'm working on a KIP right now to address this problem, so look for it in the next few weeks. Thanks, Jason On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal <sahitya2...@gmail.com> wrote: > Thanks Cees and Abhinav, will give this trick a try and update if it helped > for my case. > > Regards, > Sahitya Agrawal > > On Fri, May 13, 2016 at 9:36 PM, Cees de Groot <c...@pagerduty.com> wrote: > > > What Abhinav said. To give some context: the common cause of frequent > > rebalances is that your consumer takes too long to process batches. As > > long as you don't call into the consumer library, heartbeats aren't > > sent so if you take too long working through a batch, the broker > > things your consumer is gone and starts re-balancing. The message > > batch under processing never gets marked as done, so after > > rebalancing, things start over from the same spot. > > > > So the solution is to either make the batches smaller or the heartbeat > > interval longer. There are fancier solutions for when this doesn't > > work, but it should do the trick for most normal cases. > > > > On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan <abhinav.so...@gmail.com > > > > wrote: > > > Hi Sahitya, > > > > > > Try reducing max.partition.fetch.bytes in your consumer. > > > Then also increase heartbeat.interval.ms, this might help in to delay > > the > > > consumer rebalance of your inbound process is taking more time than > this > > > > > > - Abhinav > > > > > > On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <sahitya2...@gmail.com > > > > > wrote: > > > > > >> Hi, > > >> > > >> I am using new Kafka consumer API ( 0.9.0.0) . I created 100 > > partitions of > > >> a topic and started only one consumer to consume. Many of times , In > > >> consumer logs I see lot of rebalancing activity and no object is > > consumed > > >> due to that. > > >> > > >> Is this a known issue? Please let me know if some body can help with > > regard > > >> to this. > > >> > > >> My Consumer config: > > >> props.put("zookeeper.session.timeout.ms", "10000"); > > >> props.put("rebalance.backoff.ms","10000"); > > >> props.put("zookeeper.sync.time.ms","200"); > > >> props.put("rebalance.max.retries","10"); > > >> props.put("enable.auto.commit", "false"); > > >> props.put("consumer.timeout.ms","20000"); > > >> props.put("auto.offset.reset", "smallest"); > > >> > > >> Thanks, > > >> Sahitya > > >> > > > > > > > > -- > > Cees de Groot > > Principal Software Engineer > > PagerDuty, Inc. > > >