Thanks for the answer Jason.

Has the consumer any way to rejoin the Consumer Group?

2016-01-19 18:27 GMT+01:00 Jason Gustafson <ja...@confluent.io>:

> Hey Franco,
>
> This time I'll answer briefly ;)
>
> 1) Heartbeats also get invoked when you call another blocking operation
> such as commitSync().
> 2) If all consumers in the group die, the coordinator doesn't really do
> anything other than clean up some group state. In particular, it does not
> remove offset commits.
>
> -Jason
>
> On Sun, Jan 17, 2016 at 11:03 AM, Franco Giacosa <fgiac...@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > Thanks for the detailed explanation, I hope that KIP-41 gets added fast.
> >
> > A few questions:
> > (1) The only way to send a heartbeat is to poll? if I poll with do
> poll(0)
> > does it renew the token?
> > (2) What happens to the coordinator if all consumers die?
> >
> > Franco.
> >
> >
> >
> >
> > 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>:
> >
> > > Hi Franco,
> > >
> > > The new consumer combines the functionality of the older simple and
> > > high-level consumers. When used in simple mode, you have to assign the
> > > partitions that you want to read from using assign(). In this case, the
> > > consumer works alone and not in a group. Alternatively, if you use the
> > > subscribe() API, then a special Kafka broker (known as the coordinator)
> > > will coordinate all consumers with the same groupId to distribute the
> > > partitions of all subscribed topics among them. Each partition will be
> > > assigned automatically to exactly one consumer in the group. This is
> what
> > > is meant by automatic group management.
> > >
> > > Group management in the new consumer basically works like this: all
> > members
> > > send a request to the coordinator indicating that they need to join the
> > > group. Once joined, the consumer begins sending heartbeats to the
> > > coordinator. If no heartbeat is received before the expiration of the
> > > session timeout (configured with session.timeout.ms), then the
> > coordinator
> > > marks the consumer dead and asks all other consumers in the group to
> > rejoin
> > > (so that partitions can be reassigned). I've skipped some details, but
> > > those are the important points.
> > >
> > > Now, a relatively controversial feature of the new consumer is its
> > > single-threaded design. Instead of sending fetches and heartbeats in a
> > > background thread, all IO is done in the foreground when the user calls
> > > poll(). This implies in particular, that poll() must be called at least
> > as
> > > frequently as the session timeout in order to send heartbeats. If the
> > > session timeout expires between consecutive calls to poll(), then the
> > > coordinator will think the consumer is dead and it will reassign its
> > > partitions to other members. If you then try to commit offsets from
> that
> > > consumer, the coordinator will reject the request since it has already
> > > kicked it out of the group. When this happens, KafkaConsumer throws
> > > CommitFailedException since there is no way that the commit can
> succeed.
> > > And yes, you can see this in commitAsync as well (in the
> > > OffsetCommitCallback).
> > >
> > > Of course the main reason why the consumer would fail to poll often
> > enough
> > > is that message processing is also done in the same thread. If it takes
> > > longer than the session timeout to handle the previous batch of
> messages,
> > > then the consumer gets kicked out. It may seem a little strange to have
> > > both message processing and heartbeating in the same thread, but
> ensuring
> > > consumer liveness was one of the design objectives. If heartbeats were
> > sent
> > > from a background thread, then the message processor could die while
> the
> > > hearbeat thread remained active. In this case, the consumer would hold
> > onto
> > > the partitions indefinitely until it had been shutdown.
> > >
> > > So what can you do if you see this? First, confirm that it's actually
> > > happening. Record the interval between subsequent calls to poll() and
> > check
> > > if the CommitFailedException is thrown after the session timeout has
> > > expired (the default is 30s by the way). If it is, then there are
> > basically
> > > two choices for dealing with this problem at the moment: 1) increase
> the
> > > session timeout to give more time for message processing, and 2) move
> the
> > > processing to another thread. If you do the latter of these, you have
> to
> > > careful about coordinating commits with the message processing thread
> and
> > > how to notify it of rebalances (we have a ConsumerRebalanceListener
> that
> > > you can work with for this purpose). You can also try to tweak
> > > "max.partition.fetch.bytes," but this can be dangerous if you don't
> know
> > > the maximum size of the messages you have to handle.
> > >
> > > And for what it's worth, we're planning to add a new configuration
> > > "max.poll.records" to set an upper limit on the number of messages
> > returned
> > > from poll() (assuming that KIP-41 is approved). This can make it easier
> > to
> > > limit the message processing time so that there is less risk of running
> > > over the session timeout.
> > >
> > > Hope that helps!
> > >
> > > -Jason
> > >
> > >
> > >
> > >
> > > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > on the documentation for commitSync it says the following about the
> > > > CommitFailedException
> > > >
> > > >  * @throws org.apache.kafka.clients.consumer.CommitFailedException if
> > the
> > > > commit failed and cannot be retried.
> > > >   * This can only occur if you are using automatic group management
> > with
> > > > {@link #subscribe(List)}
> > > >
> > > > 1. Can someone explain me what automatic group management means? and
> if
> > > its
> > > > just the consumer group, why is that the commit may fail?
> > > >
> > > > 2.In a Consumer Group, each consumer has 1 or many partitions
> assigned
> > > > right? So if the consumers are not sharing partitions why is that the
> > > > commit may failed and cannot be retried?
> > > >
> > > > 3.This also happens in commitAsync right? I don't see it on the
> > > > documentation so thats why I am asking.
> > > >
> > > > Thanks,
> > > > Franco.
> > > >
> > >
> >
>

Reply via email to