Hi Jason, Thanks for the detailed explanation, I hope that KIP-41 gets added fast.
A few questions: (1) The only way to send a heartbeat is to poll? if I poll with do poll(0) does it renew the token? (2) What happens to the coordinator if all consumers die? Franco. 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>: > Hi Franco, > > The new consumer combines the functionality of the older simple and > high-level consumers. When used in simple mode, you have to assign the > partitions that you want to read from using assign(). In this case, the > consumer works alone and not in a group. Alternatively, if you use the > subscribe() API, then a special Kafka broker (known as the coordinator) > will coordinate all consumers with the same groupId to distribute the > partitions of all subscribed topics among them. Each partition will be > assigned automatically to exactly one consumer in the group. This is what > is meant by automatic group management. > > Group management in the new consumer basically works like this: all members > send a request to the coordinator indicating that they need to join the > group. Once joined, the consumer begins sending heartbeats to the > coordinator. If no heartbeat is received before the expiration of the > session timeout (configured with session.timeout.ms), then the coordinator > marks the consumer dead and asks all other consumers in the group to rejoin > (so that partitions can be reassigned). I've skipped some details, but > those are the important points. > > Now, a relatively controversial feature of the new consumer is its > single-threaded design. Instead of sending fetches and heartbeats in a > background thread, all IO is done in the foreground when the user calls > poll(). This implies in particular, that poll() must be called at least as > frequently as the session timeout in order to send heartbeats. If the > session timeout expires between consecutive calls to poll(), then the > coordinator will think the consumer is dead and it will reassign its > partitions to other members. If you then try to commit offsets from that > consumer, the coordinator will reject the request since it has already > kicked it out of the group. When this happens, KafkaConsumer throws > CommitFailedException since there is no way that the commit can succeed. > And yes, you can see this in commitAsync as well (in the > OffsetCommitCallback). > > Of course the main reason why the consumer would fail to poll often enough > is that message processing is also done in the same thread. If it takes > longer than the session timeout to handle the previous batch of messages, > then the consumer gets kicked out. It may seem a little strange to have > both message processing and heartbeating in the same thread, but ensuring > consumer liveness was one of the design objectives. If heartbeats were sent > from a background thread, then the message processor could die while the > hearbeat thread remained active. In this case, the consumer would hold onto > the partitions indefinitely until it had been shutdown. > > So what can you do if you see this? First, confirm that it's actually > happening. Record the interval between subsequent calls to poll() and check > if the CommitFailedException is thrown after the session timeout has > expired (the default is 30s by the way). If it is, then there are basically > two choices for dealing with this problem at the moment: 1) increase the > session timeout to give more time for message processing, and 2) move the > processing to another thread. If you do the latter of these, you have to > careful about coordinating commits with the message processing thread and > how to notify it of rebalances (we have a ConsumerRebalanceListener that > you can work with for this purpose). You can also try to tweak > "max.partition.fetch.bytes," but this can be dangerous if you don't know > the maximum size of the messages you have to handle. > > And for what it's worth, we're planning to add a new configuration > "max.poll.records" to set an upper limit on the number of messages returned > from poll() (assuming that KIP-41 is approved). This can make it easier to > limit the message processing time so that there is less risk of running > over the session timeout. > > Hope that helps! > > -Jason > > > > > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com> > wrote: > > > Hi, > > > > on the documentation for commitSync it says the following about the > > CommitFailedException > > > > * @throws org.apache.kafka.clients.consumer.CommitFailedException if the > > commit failed and cannot be retried. > > * This can only occur if you are using automatic group management with > > {@link #subscribe(List)} > > > > 1. Can someone explain me what automatic group management means? and if > its > > just the consumer group, why is that the commit may fail? > > > > 2.In a Consumer Group, each consumer has 1 or many partitions assigned > > right? So if the consumers are not sharing partitions why is that the > > commit may failed and cannot be retried? > > > > 3.This also happens in commitAsync right? I don't see it on the > > documentation so thats why I am asking. > > > > Thanks, > > Franco. > > >