Thanks for the answer Jason. Has the consumer any way to rejoin the Consumer Group?
2016-01-19 18:27 GMT+01:00 Jason Gustafson <ja...@confluent.io>: > Hey Franco, > > This time I'll answer briefly ;) > > 1) Heartbeats also get invoked when you call another blocking operation > such as commitSync(). > 2) If all consumers in the group die, the coordinator doesn't really do > anything other than clean up some group state. In particular, it does not > remove offset commits. > > -Jason > > On Sun, Jan 17, 2016 at 11:03 AM, Franco Giacosa <fgiac...@gmail.com> > wrote: > > > Hi Jason, > > > > Thanks for the detailed explanation, I hope that KIP-41 gets added fast. > > > > A few questions: > > (1) The only way to send a heartbeat is to poll? if I poll with do > poll(0) > > does it renew the token? > > (2) What happens to the coordinator if all consumers die? > > > > Franco. > > > > > > > > > > 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>: > > > > > Hi Franco, > > > > > > The new consumer combines the functionality of the older simple and > > > high-level consumers. When used in simple mode, you have to assign the > > > partitions that you want to read from using assign(). In this case, the > > > consumer works alone and not in a group. Alternatively, if you use the > > > subscribe() API, then a special Kafka broker (known as the coordinator) > > > will coordinate all consumers with the same groupId to distribute the > > > partitions of all subscribed topics among them. Each partition will be > > > assigned automatically to exactly one consumer in the group. This is > what > > > is meant by automatic group management. > > > > > > Group management in the new consumer basically works like this: all > > members > > > send a request to the coordinator indicating that they need to join the > > > group. Once joined, the consumer begins sending heartbeats to the > > > coordinator. If no heartbeat is received before the expiration of the > > > session timeout (configured with session.timeout.ms), then the > > coordinator > > > marks the consumer dead and asks all other consumers in the group to > > rejoin > > > (so that partitions can be reassigned). I've skipped some details, but > > > those are the important points. > > > > > > Now, a relatively controversial feature of the new consumer is its > > > single-threaded design. Instead of sending fetches and heartbeats in a > > > background thread, all IO is done in the foreground when the user calls > > > poll(). This implies in particular, that poll() must be called at least > > as > > > frequently as the session timeout in order to send heartbeats. If the > > > session timeout expires between consecutive calls to poll(), then the > > > coordinator will think the consumer is dead and it will reassign its > > > partitions to other members. If you then try to commit offsets from > that > > > consumer, the coordinator will reject the request since it has already > > > kicked it out of the group. When this happens, KafkaConsumer throws > > > CommitFailedException since there is no way that the commit can > succeed. > > > And yes, you can see this in commitAsync as well (in the > > > OffsetCommitCallback). > > > > > > Of course the main reason why the consumer would fail to poll often > > enough > > > is that message processing is also done in the same thread. If it takes > > > longer than the session timeout to handle the previous batch of > messages, > > > then the consumer gets kicked out. It may seem a little strange to have > > > both message processing and heartbeating in the same thread, but > ensuring > > > consumer liveness was one of the design objectives. If heartbeats were > > sent > > > from a background thread, then the message processor could die while > the > > > hearbeat thread remained active. In this case, the consumer would hold > > onto > > > the partitions indefinitely until it had been shutdown. > > > > > > So what can you do if you see this? First, confirm that it's actually > > > happening. Record the interval between subsequent calls to poll() and > > check > > > if the CommitFailedException is thrown after the session timeout has > > > expired (the default is 30s by the way). If it is, then there are > > basically > > > two choices for dealing with this problem at the moment: 1) increase > the > > > session timeout to give more time for message processing, and 2) move > the > > > processing to another thread. If you do the latter of these, you have > to > > > careful about coordinating commits with the message processing thread > and > > > how to notify it of rebalances (we have a ConsumerRebalanceListener > that > > > you can work with for this purpose). You can also try to tweak > > > "max.partition.fetch.bytes," but this can be dangerous if you don't > know > > > the maximum size of the messages you have to handle. > > > > > > And for what it's worth, we're planning to add a new configuration > > > "max.poll.records" to set an upper limit on the number of messages > > returned > > > from poll() (assuming that KIP-41 is approved). This can make it easier > > to > > > limit the message processing time so that there is less risk of running > > > over the session timeout. > > > > > > Hope that helps! > > > > > > -Jason > > > > > > > > > > > > > > > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fgiac...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > on the documentation for commitSync it says the following about the > > > > CommitFailedException > > > > > > > > * @throws org.apache.kafka.clients.consumer.CommitFailedException if > > the > > > > commit failed and cannot be retried. > > > > * This can only occur if you are using automatic group management > > with > > > > {@link #subscribe(List)} > > > > > > > > 1. Can someone explain me what automatic group management means? and > if > > > its > > > > just the consumer group, why is that the commit may fail? > > > > > > > > 2.In a Consumer Group, each consumer has 1 or many partitions > assigned > > > > right? So if the consumers are not sharing partitions why is that the > > > > commit may failed and cannot be retried? > > > > > > > > 3.This also happens in commitAsync right? I don't see it on the > > > > documentation so thats why I am asking. > > > > > > > > Thanks, > > > > Franco. > > > > > > > > > >