Hi Jason,

Thanks for the detailed explanation, I hope that KIP-41 gets added fast.

A few questions:
(1) The only way to send a heartbeat is to poll? if I poll with do poll(0)
does it renew the token?
(2) What happens to the coordinator if all consumers die?

Franco.




2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>:

> Hi Franco,
>
> The new consumer combines the functionality of the older simple and
> high-level consumers. When used in simple mode, you have to assign the
> partitions that you want to read from using assign(). In this case, the
> consumer works alone and not in a group. Alternatively, if you use the
> subscribe() API, then a special Kafka broker (known as the coordinator)
> will coordinate all consumers with the same groupId to distribute the
> partitions of all subscribed topics among them. Each partition will be
> assigned automatically to exactly one consumer in the group. This is what
> is meant by automatic group management.
>
> Group management in the new consumer basically works like this: all members
> send a request to the coordinator indicating that they need to join the
> group. Once joined, the consumer begins sending heartbeats to the
> coordinator. If no heartbeat is received before the expiration of the
> session timeout (configured with session.timeout.ms), then the coordinator
> marks the consumer dead and asks all other consumers in the group to rejoin
> (so that partitions can be reassigned). I've skipped some details, but
> those are the important points.
>
> Now, a relatively controversial feature of the new consumer is its
> single-threaded design. Instead of sending fetches and heartbeats in a
> background thread, all IO is done in the foreground when the user calls
> poll(). This implies in particular, that poll() must be called at least as
> frequently as the session timeout in order to send heartbeats. If the
> session timeout expires between consecutive calls to poll(), then the
> coordinator will think the consumer is dead and it will reassign its
> partitions to other members. If you then try to commit offsets from that
> consumer, the coordinator will reject the request since it has already
> kicked it out of the group. When this happens, KafkaConsumer throws
> CommitFailedException since there is no way that the commit can succeed.
> And yes, you can see this in commitAsync as well (in the
> OffsetCommitCallback).
>
> Of course the main reason why the consumer would fail to poll often enough
> is that message processing is also done in the same thread. If it takes
> longer than the session timeout to handle the previous batch of messages,
> then the consumer gets kicked out. It may seem a little strange to have
> both message processing and heartbeating in the same thread, but ensuring
> consumer liveness was one of the design objectives. If heartbeats were sent
> from a background thread, then the message processor could die while the
> hearbeat thread remained active. In this case, the consumer would hold onto
> the partitions indefinitely until it had been shutdown.
>
> So what can you do if you see this? First, confirm that it's actually
> happening. Record the interval between subsequent calls to poll() and check
> if the CommitFailedException is thrown after the session timeout has
> expired (the default is 30s by the way). If it is, then there are basically
> two choices for dealing with this problem at the moment: 1) increase the
> session timeout to give more time for message processing, and 2) move the
> processing to another thread. If you do the latter of these, you have to
> careful about coordinating commits with the message processing thread and
> how to notify it of rebalances (we have a ConsumerRebalanceListener that
> you can work with for this purpose). You can also try to tweak
> "max.partition.fetch.bytes," but this can be dangerous if you don't know
> the maximum size of the messages you have to handle.
>
> And for what it's worth, we're planning to add a new configuration
> "max.poll.records" to set an upper limit on the number of messages returned
> from poll() (assuming that KIP-41 is approved). This can make it easier to
> limit the message processing time so that there is less risk of running
> over the session timeout.
>
> Hope that helps!
>
> -Jason
>
>
>
>
> On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com>
> wrote:
>
> > Hi,
> >
> > on the documentation for commitSync it says the following about the
> > CommitFailedException
> >
> >  * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
> > commit failed and cannot be retried.
> >   * This can only occur if you are using automatic group management with
> > {@link #subscribe(List)}
> >
> > 1. Can someone explain me what automatic group management means? and if
> its
> > just the consumer group, why is that the commit may fail?
> >
> > 2.In a Consumer Group, each consumer has 1 or many partitions assigned
> > right? So if the consumers are not sharing partitions why is that the
> > commit may failed and cannot be retried?
> >
> > 3.This also happens in commitAsync right? I don't see it on the
> > documentation so thats why I am asking.
> >
> > Thanks,
> > Franco.
> >
>

Reply via email to