Hi Franco, The new consumer combines the functionality of the older simple and high-level consumers. When used in simple mode, you have to assign the partitions that you want to read from using assign(). In this case, the consumer works alone and not in a group. Alternatively, if you use the subscribe() API, then a special Kafka broker (known as the coordinator) will coordinate all consumers with the same groupId to distribute the partitions of all subscribed topics among them. Each partition will be assigned automatically to exactly one consumer in the group. This is what is meant by automatic group management.
Group management in the new consumer basically works like this: all members send a request to the coordinator indicating that they need to join the group. Once joined, the consumer begins sending heartbeats to the coordinator. If no heartbeat is received before the expiration of the session timeout (configured with session.timeout.ms), then the coordinator marks the consumer dead and asks all other consumers in the group to rejoin (so that partitions can be reassigned). I've skipped some details, but those are the important points. Now, a relatively controversial feature of the new consumer is its single-threaded design. Instead of sending fetches and heartbeats in a background thread, all IO is done in the foreground when the user calls poll(). This implies in particular, that poll() must be called at least as frequently as the session timeout in order to send heartbeats. If the session timeout expires between consecutive calls to poll(), then the coordinator will think the consumer is dead and it will reassign its partitions to other members. If you then try to commit offsets from that consumer, the coordinator will reject the request since it has already kicked it out of the group. When this happens, KafkaConsumer throws CommitFailedException since there is no way that the commit can succeed. And yes, you can see this in commitAsync as well (in the OffsetCommitCallback). Of course the main reason why the consumer would fail to poll often enough is that message processing is also done in the same thread. If it takes longer than the session timeout to handle the previous batch of messages, then the consumer gets kicked out. It may seem a little strange to have both message processing and heartbeating in the same thread, but ensuring consumer liveness was one of the design objectives. If heartbeats were sent from a background thread, then the message processor could die while the hearbeat thread remained active. In this case, the consumer would hold onto the partitions indefinitely until it had been shutdown. So what can you do if you see this? First, confirm that it's actually happening. Record the interval between subsequent calls to poll() and check if the CommitFailedException is thrown after the session timeout has expired (the default is 30s by the way). If it is, then there are basically two choices for dealing with this problem at the moment: 1) increase the session timeout to give more time for message processing, and 2) move the processing to another thread. If you do the latter of these, you have to careful about coordinating commits with the message processing thread and how to notify it of rebalances (we have a ConsumerRebalanceListener that you can work with for this purpose). You can also try to tweak "max.partition.fetch.bytes," but this can be dangerous if you don't know the maximum size of the messages you have to handle. And for what it's worth, we're planning to add a new configuration "max.poll.records" to set an upper limit on the number of messages returned from poll() (assuming that KIP-41 is approved). This can make it easier to limit the message processing time so that there is less risk of running over the session timeout. Hope that helps! -Jason On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com> wrote: > Hi, > > on the documentation for commitSync it says the following about the > CommitFailedException > > * @throws org.apache.kafka.clients.consumer.CommitFailedException if the > commit failed and cannot be retried. > * This can only occur if you are using automatic group management with > {@link #subscribe(List)} > > 1. Can someone explain me what automatic group management means? and if its > just the consumer group, why is that the commit may fail? > > 2.In a Consumer Group, each consumer has 1 or many partitions assigned > right? So if the consumers are not sharing partitions why is that the > commit may failed and cannot be retried? > > 3.This also happens in commitAsync right? I don't see it on the > documentation so thats why I am asking. > > Thanks, > Franco. >