Hi Franco,

The new consumer combines the functionality of the older simple and
high-level consumers. When used in simple mode, you have to assign the
partitions that you want to read from using assign(). In this case, the
consumer works alone and not in a group. Alternatively, if you use the
subscribe() API, then a special Kafka broker (known as the coordinator)
will coordinate all consumers with the same groupId to distribute the
partitions of all subscribed topics among them. Each partition will be
assigned automatically to exactly one consumer in the group. This is what
is meant by automatic group management.

Group management in the new consumer basically works like this: all members
send a request to the coordinator indicating that they need to join the
group. Once joined, the consumer begins sending heartbeats to the
coordinator. If no heartbeat is received before the expiration of the
session timeout (configured with session.timeout.ms), then the coordinator
marks the consumer dead and asks all other consumers in the group to rejoin
(so that partitions can be reassigned). I've skipped some details, but
those are the important points.

Now, a relatively controversial feature of the new consumer is its
single-threaded design. Instead of sending fetches and heartbeats in a
background thread, all IO is done in the foreground when the user calls
poll(). This implies in particular, that poll() must be called at least as
frequently as the session timeout in order to send heartbeats. If the
session timeout expires between consecutive calls to poll(), then the
coordinator will think the consumer is dead and it will reassign its
partitions to other members. If you then try to commit offsets from that
consumer, the coordinator will reject the request since it has already
kicked it out of the group. When this happens, KafkaConsumer throws
CommitFailedException since there is no way that the commit can succeed.
And yes, you can see this in commitAsync as well (in the
OffsetCommitCallback).

Of course the main reason why the consumer would fail to poll often enough
is that message processing is also done in the same thread. If it takes
longer than the session timeout to handle the previous batch of messages,
then the consumer gets kicked out. It may seem a little strange to have
both message processing and heartbeating in the same thread, but ensuring
consumer liveness was one of the design objectives. If heartbeats were sent
from a background thread, then the message processor could die while the
hearbeat thread remained active. In this case, the consumer would hold onto
the partitions indefinitely until it had been shutdown.

So what can you do if you see this? First, confirm that it's actually
happening. Record the interval between subsequent calls to poll() and check
if the CommitFailedException is thrown after the session timeout has
expired (the default is 30s by the way). If it is, then there are basically
two choices for dealing with this problem at the moment: 1) increase the
session timeout to give more time for message processing, and 2) move the
processing to another thread. If you do the latter of these, you have to
careful about coordinating commits with the message processing thread and
how to notify it of rebalances (we have a ConsumerRebalanceListener that
you can work with for this purpose). You can also try to tweak
"max.partition.fetch.bytes," but this can be dangerous if you don't know
the maximum size of the messages you have to handle.

And for what it's worth, we're planning to add a new configuration
"max.poll.records" to set an upper limit on the number of messages returned
from poll() (assuming that KIP-41 is approved). This can make it easier to
limit the message processing time so that there is less risk of running
over the session timeout.

Hope that helps!

-Jason




On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com> wrote:

> Hi,
>
> on the documentation for commitSync it says the following about the
> CommitFailedException
>
>  * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
> commit failed and cannot be retried.
>   * This can only occur if you are using automatic group management with
> {@link #subscribe(List)}
>
> 1. Can someone explain me what automatic group management means? and if its
> just the consumer group, why is that the commit may fail?
>
> 2.In a Consumer Group, each consumer has 1 or many partitions assigned
> right? So if the consumers are not sharing partitions why is that the
> commit may failed and cannot be retried?
>
> 3.This also happens in commitAsync right? I don't see it on the
> documentation so thats why I am asking.
>
> Thanks,
> Franco.
>

Reply via email to