Hey Franco, This time I'll answer briefly ;)
1) Heartbeats also get invoked when you call another blocking operation such as commitSync(). 2) If all consumers in the group die, the coordinator doesn't really do anything other than clean up some group state. In particular, it does not remove offset commits. -Jason On Sun, Jan 17, 2016 at 11:03 AM, Franco Giacosa <fgiac...@gmail.com> wrote: > Hi Jason, > > Thanks for the detailed explanation, I hope that KIP-41 gets added fast. > > A few questions: > (1) The only way to send a heartbeat is to poll? if I poll with do poll(0) > does it renew the token? > (2) What happens to the coordinator if all consumers die? > > Franco. > > > > > 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>: > > > Hi Franco, > > > > The new consumer combines the functionality of the older simple and > > high-level consumers. When used in simple mode, you have to assign the > > partitions that you want to read from using assign(). In this case, the > > consumer works alone and not in a group. Alternatively, if you use the > > subscribe() API, then a special Kafka broker (known as the coordinator) > > will coordinate all consumers with the same groupId to distribute the > > partitions of all subscribed topics among them. Each partition will be > > assigned automatically to exactly one consumer in the group. This is what > > is meant by automatic group management. > > > > Group management in the new consumer basically works like this: all > members > > send a request to the coordinator indicating that they need to join the > > group. Once joined, the consumer begins sending heartbeats to the > > coordinator. If no heartbeat is received before the expiration of the > > session timeout (configured with session.timeout.ms), then the > coordinator > > marks the consumer dead and asks all other consumers in the group to > rejoin > > (so that partitions can be reassigned). I've skipped some details, but > > those are the important points. > > > > Now, a relatively controversial feature of the new consumer is its > > single-threaded design. Instead of sending fetches and heartbeats in a > > background thread, all IO is done in the foreground when the user calls > > poll(). This implies in particular, that poll() must be called at least > as > > frequently as the session timeout in order to send heartbeats. If the > > session timeout expires between consecutive calls to poll(), then the > > coordinator will think the consumer is dead and it will reassign its > > partitions to other members. If you then try to commit offsets from that > > consumer, the coordinator will reject the request since it has already > > kicked it out of the group. When this happens, KafkaConsumer throws > > CommitFailedException since there is no way that the commit can succeed. > > And yes, you can see this in commitAsync as well (in the > > OffsetCommitCallback). > > > > Of course the main reason why the consumer would fail to poll often > enough > > is that message processing is also done in the same thread. If it takes > > longer than the session timeout to handle the previous batch of messages, > > then the consumer gets kicked out. It may seem a little strange to have > > both message processing and heartbeating in the same thread, but ensuring > > consumer liveness was one of the design objectives. If heartbeats were > sent > > from a background thread, then the message processor could die while the > > hearbeat thread remained active. In this case, the consumer would hold > onto > > the partitions indefinitely until it had been shutdown. > > > > So what can you do if you see this? First, confirm that it's actually > > happening. Record the interval between subsequent calls to poll() and > check > > if the CommitFailedException is thrown after the session timeout has > > expired (the default is 30s by the way). If it is, then there are > basically > > two choices for dealing with this problem at the moment: 1) increase the > > session timeout to give more time for message processing, and 2) move the > > processing to another thread. If you do the latter of these, you have to > > careful about coordinating commits with the message processing thread and > > how to notify it of rebalances (we have a ConsumerRebalanceListener that > > you can work with for this purpose). You can also try to tweak > > "max.partition.fetch.bytes," but this can be dangerous if you don't know > > the maximum size of the messages you have to handle. > > > > And for what it's worth, we're planning to add a new configuration > > "max.poll.records" to set an upper limit on the number of messages > returned > > from poll() (assuming that KIP-41 is approved). This can make it easier > to > > limit the message processing time so that there is less risk of running > > over the session timeout. > > > > Hope that helps! > > > > -Jason > > > > > > > > > > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com> > > wrote: > > > > > Hi, > > > > > > on the documentation for commitSync it says the following about the > > > CommitFailedException > > > > > > * @throws org.apache.kafka.clients.consumer.CommitFailedException if > the > > > commit failed and cannot be retried. > > > * This can only occur if you are using automatic group management > with > > > {@link #subscribe(List)} > > > > > > 1. Can someone explain me what automatic group management means? and if > > its > > > just the consumer group, why is that the commit may fail? > > > > > > 2.In a Consumer Group, each consumer has 1 or many partitions assigned > > > right? So if the consumers are not sharing partitions why is that the > > > commit may failed and cannot be retried? > > > > > > 3.This also happens in commitAsync right? I don't see it on the > > > documentation so thats why I am asking. > > > > > > Thanks, > > > Franco. > > > > > >