To pile on a little bit, the API is designed to ensure consumer liveness so
that partitions cannot be held indefinitely by a defunct process. Since
heartbeating and message processing are done in the same thread, the
consumer needs to demonstrate "progress" by calling poll() often enough not
to get kicked out. What "often enough" means is dictated by the session
timeout, which is 30s by default. If you fail to call poll() before the
session timer expires, then the broker will assume that the member is dead
and begin a rebalance. If you need more time to handle messages, increase
session.timeout.ms in your configuration. The only downside to a higher
timeout in general is that it will take longer to detect other kinds of
failures (such as process crashes or network partitions).

This was the initial design, but it hasn't worked out quite as well as we
would have liked, at least not in all situations. The first problem in 0.9
is that you don't have a direct way to control the amount of data that can
be returned in a call to poll(), which makes it difficult to estimate the
session timeout. You can set max.partition.fetch.bytes and, based on an
estimate for the total number of partitions that you need to read, try to
come up with a guess, but this is kind of hard in practice. So in 0.10
we've introduced a new setting max.poll.records, which lets you set an
explicit bound on the number of messages that need to be handled on each
poll iteration. The idea is hopefully that you can set this to a reasonably
low value so that you're never risking a session timeout.

It's also worthwhile understanding a little bit about how the rebalance
mechanism works. After a consumer group is created, each consumer begins
sending heartbeat messages to a special broker known as the coordinator.
When a new consumer joins the group (or when the session timeout of an
existing member expires), the other members find out about it through the
error code in the heartbeat response. The group coordination protocol
basically implements a synchronization barrier. When a rebalance begins,
all members of the group have to join the barrier for it to complete. So if
you want to reduce the impact from rebalancing, then you need to ensure
that all members can join the barrier as soon as possible after it begins.
For this, we expose heartbeat.interval.ms, but note that we can't actually
send heartbeats any faster than the poll() interval itself because
everything is done from the same thread. So if you want to always have fast
rebalances, then the target for setting the processing bound should be the
heartbeat interval instead of the session timeout.

We've made some other small improvements to make unexpected rebalancing
less of a problem in practice. For example, we modified the protocol
behavior to allow offset commits to serve as effective heartbeats, which
wasn't the case in 0.9. However, we're still encountering situations where
there's really no clear way to estimate the session timeout other than
somewhat exhaustive testing. Even max.poll.records doesn't help when the
impact of a single message can vary disproportionately (as is sometimes the
case in Kafka Streams which uses the consumer internally). You could set a
ridiculously large session timeout in these cases, but that guarantees also
a long time to recover from hard failures. I think this basically means
that these use cases need a separate notion of liveness, which they have a
bit more control over. For example, we can expose a method in the consumer
which applications can call from any thread to know that they're still
around. I'm working on a KIP right now to address this problem, so look for
it in the next few weeks.

Thanks,
Jason

On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal <sahitya2...@gmail.com>
wrote:

> Thanks Cees and Abhinav, will give this trick a try and update if it helped
> for my case.
>
> Regards,
> Sahitya Agrawal
>
> On Fri, May 13, 2016 at 9:36 PM, Cees de Groot <c...@pagerduty.com> wrote:
>
> > What Abhinav said. To give some context: the common cause of frequent
> > rebalances is that your consumer takes too long to process batches. As
> > long as you don't call into the consumer library, heartbeats aren't
> > sent so if you take too long working through a batch, the broker
> > things your consumer is gone and starts re-balancing. The message
> > batch under processing never gets marked as done, so after
> > rebalancing, things start over from the same spot.
> >
> > So the solution is to either make the batches smaller or the heartbeat
> > interval longer. There are fancier solutions for when this doesn't
> > work, but it should do the trick for most normal cases.
> >
> > On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan <abhinav.so...@gmail.com
> >
> > wrote:
> > > Hi Sahitya,
> > >
> > > Try reducing max.partition.fetch.bytes in your consumer.
> > > Then also increase heartbeat.interval.ms, this might help in to delay
> > the
> > > consumer rebalance of your inbound process is taking more time than
> this
> > >
> > > - Abhinav
> > >
> > > On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <sahitya2...@gmail.com
> >
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I am using new Kafka consumer API ( 0.9.0.0) . I created 100
> > partitions  of
> > >> a topic and started only one consumer to consume. Many of times , In
> > >> consumer logs I see lot of rebalancing activity and no object is
> > consumed
> > >> due to that.
> > >>
> > >> Is this a known issue? Please let me know if some body can help with
> > regard
> > >> to this.
> > >>
> > >> My Consumer config:
> > >>         props.put("zookeeper.session.timeout.ms", "10000");
> > >>         props.put("rebalance.backoff.ms","10000");
> > >>         props.put("zookeeper.sync.time.ms","200");
> > >>         props.put("rebalance.max.retries","10");
> > >>         props.put("enable.auto.commit", "false");
> > >>         props.put("consumer.timeout.ms","20000");
> > >>         props.put("auto.offset.reset", "smallest");
> > >>
> > >> Thanks,
> > >> Sahitya
> > >>
> >
> >
> >
> > --
> > Cees de Groot
> > Principal Software Engineer
> > PagerDuty, Inc.
> >
>

Reply via email to