Hi Vinay, Thanks for that information, it's good to know that will be fixed but I'm not sure what would trigger this to happen in the 0.9.0.1 release? What would cause the metadata refresh to be called while the consumer is processing a batch of messages where I'm committing after each message and each message is processed within session.timeout.ms=30000 ?
I added a sleep call to my consumer so that it took 4 seconds to process each message in a batch where my session.timeout.ms=30000. Each commitSync(offsets) call results in the following log messages. 2016-04-26 16:07:08,877 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received successful heartbeat response. 2016-04-26 16:07:08,878 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed offset 132 for partition phil-pa-1-device-update-1 I ran this with a batch of messages that took longer than session.timeout.ms=30000 to be processed but I never saw a metadata refresh. I searched through my older logs and I only seem to see the metadata requests when the consumers first start running and from a producer that it is in the same process. I guess a consumer rebalance will also trigger a metadata refresh but what else might? Thanks Phil Luckhurst -----Original Message----- From: vinay sharma [mailto:vinsharma.t...@gmail.com] Sent: 26 April 2016 13:24 To: users@kafka.apache.org Subject: RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1) Hi Phil, CommitSync sends a heartbeat request on each call but it seems that somehow it stops sending a heartbeat request after a meta refresh till next poll. I asked about this on dev list and came to know that this is fixed in 0.10.0.0 which is next version. I heve not gone in to detail of defect but it seems something is fixed related to time reset of hearbeat task so that next heatbeat request time is calculated correctly. From next version commitSync will act as heartbeat as per the defect. Regards, Vinay Sharma On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <phil.luckhu...@encycle.com> wrote: > Hi Vinay, > > "Regarding identifying a rebalance, how about comparing array used for > consumer pause with current assignments of consumer?" > > I did consider checking that in the commitSync exception handler but > didn't try it because if this is in the consumer that has caused the > rebalance (i.e. the one that appears to be dead) I didn't think its > partition assignments would have been updated when handling the > exception, the ConsumerRebalanceListener callbacks have not yet been > called - I can give it a try though. That's why I thought having > commitSync throw an explicit 'rebalance in progress' type exception > rather than just a KafkaException would allow this to be easily identified > and handled. > > The information about the metadata request is useful, I'll watch out > for that if we change our commit logic. > > Thanks > Phil Luckhurst > > > -----Original Message----- > From: vinay sharma [mailto:vinsharma.t...@gmail.com] > Sent: 25 April 2016 20:30 > To: users@kafka.apache.org > Subject: Re: Detecting rebalance while processing ConsumerRecords > (0.9.0.1) > > Hi Phil, > > Regarding identifying a rebalance, how about comparing array used for > consumer pause with current assignments of consumer? > > Regarding refresh after meta data refresh request, that will not > happen if you are committing after each record. I have Session time of > 30000 ms and if i commit last processed records before session time > out then everything is fine except after a meta data refresh request i > see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while > committing offsets" on further commits from consumer till next poll. > This error means that even on committing on regular intervals (which > sends heartbeat) this somehow does not saves consumer from getting > timeout during a meta refresh. This issue does not happen if i am > committing after each record that is between 2-4 seconds or if a commit > happens tight after meta refresh response. > > Regards, > Vinay Sharma > > > On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst < > phil.luckhu...@encycle.com > > wrote: > > > Hi Vinay, > > > > I'm currently calling commitSync(Map<TopicPartition, > > OffsetAndMetadata> > > offsets) after each message to just write the partition offset for > > that specific message. Our messages can take several seconds to > > process and this only seems to be adding 1 or 2 milliseconds to the > > time so is not looking like a significant overhead and is acting as > > our > heartbeat. > > > > WRT " I see that kafka sends a metadata refresh request after every > > 300000 ms (default) and even though nothing changed (no new > > consumer, broker or > > partition) this refresh generally triggers a rebalance (at least in > > my > > tests) ." > > > > I'm not seeing this. We've got a ConsumerRebalanceListener > > implemented on our consumers and I don't see this this get called > > even though I see lots of metadata request being sent. We can also > > have quiet periods where we often exceed the 300000 ms refresh > > default and those metadata requests don't trigger a rebalance either. > > > > I'm calling consumer.pause(consumer.assignment().toArray(new > > TopicPartition[0])) at the start of each batch and > > consumer.resume(consumer.assignment().toArray(new > > TopicPartition[0])) the end. This allows us to call poll(0) in the > > message loop if we need to block on a message for more than > > session.timeout.ms ( this can happen if an external system is > > temporarily unavailable). Again this seems to work ok and does not trigger > > a rebalance. > > > > The only issue we've found is as mentioned before where a rebalance > > occurs while we are processing a batch of messages. When that > > happens the commitSync fails with a KafkaException and the message > > states this is due to a rebalance. We'd like to skip the rest of the > > batch when this happens but to do that we'd need to know for sure > > that it was because of a rebalance but KafkaException could be > > called for other reasons. A KafkaRebalanceException or even a method > > we could call on the consumer would allow us to safely abort the > > current processing loop knowing that the remaining messages would be > > picked up by another consumer after the rebalance - that would stop us > > processing duplicates. > > > > Thanks > > Phil Luckhurst > > > > > > -----Original Message----- > > From: vinay sharma [mailto:vinsharma.t...@gmail.com] > > Sent: 22 April 2016 14:24 > > To: users@kafka.apache.org > > Subject: Re: Detecting rebalance while processing ConsumerRecords > > (0.9.0.1) > > > > Hi Phil, > > > > Regarding pause and resume,I have not tried this approach but i > > think this approach may not be feasible. If your consumer no longer > > has that partition assigned from which record being processed was > > fetched or even if partition is assigned again to consumer somehow > > you may still not be able to do this and see UNKNOWN_MEMBER_ID or > > error ILLEGAL_GENERATION_ID in logs.Let me know if this approach > > works for > you. i will also try this then. > > > > Kafka never sends same record to 2 consumers in same consumer group. > > If another consumer got the same record then it means that first > > consumer doesn't has that particular partition assigned to it anymore. > > Even if you want to commit from first consumer you will not be able > > to and commit will throw exception. Even after increasing > > sessionTimeOut a rebalance can still occur. I see that kafka sends a > > metadata refresh request after every 300000 ms (default) and even > > though nothing changed (no new consumer, broker or > > partition) this refresh generally triggers a rebalance (atleast in > > my > > tests) . > > > > Calling commitSync renews session and keeps consumer alive. > > commitSync is a blocking operation so you may not want to call it on > > each record processing. You can try calling commitSync(Offset) just > > before starting to process a record only if lets say 75% of > > configured session time is elapsed. This will keep your consumer > > alive during rare longer processing time and will also not commit > > each record. But as i said earlier this will not guarantee that > > rebalance will not happen. A metadata refresh or other change may > > still trigger rebalance but if you take above approach then atleast > > a rebalance will not occur because of session time out during a longer > > processing time. > > > > if you maintain offsets outside of kafka and configure consumers to > > coordinate with each other through these external offsets then you > > can skip processing duplicate records even if kafka sends same records > > twice. > > Through these external offsets you will have to device a way to skip > > a record if already processed by another consumer or wait if same > > record is in process by another consumer. > > > > > > Regards, > > Vinay Sharma > > > > > > > > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst < > > phil.luckhu...@encycle.com> > > wrote: > > > > > Thanks for all the responses. Unfortunately it seems that > > > currently there is no fool proof solution to this. It's not a > > > problem with the stored offsets as it will happen even if I do a > > > commitSync after each record is processed. It's the unprocessed > > > records in the batch that get processed twice. > > > > > > I'm now taking the approach of trying to limit the possibility of > > > a rebalance as much as possible by reducing the data returned by poll. > > > I'm also using the pause, poll, resume pattern to ensure the > > > consumer doesn't cause a rebalance if the processing loop takes > > > longer than session.timeout.ms. > > > > > > Cheers, > > > Phil > > > > > > > > > > > > On 21 Apr 2016 16:24, at 16:24, vinay sharma > > > <vinsharma.t...@gmail.com> > > > wrote: > > > >Hi, > > > > > > > >By design Kafka does ensure not to send same record to multiple > > > >consumers in same consumer group. Issue is because of rebalance > > > >while a processing is going on and records are not yet commited. > > > >In my view there are only 2 possible solutions to it > > > >1) As mentioned in documentation, store offsets outside of kafka > > > >( > > > > > > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/c > > > li > > > en > > > ts/consumer/KafkaConsumer.html > > > ). > > > >This is a complete solution but will definitely add extra > > > >developement and also extra processing to each message. Problem > > > >may still exist if at the time of a crash consumer was out of > > > >sync from external custom offset storage and offsets stored in kafka > > > >both. > > > >2) As mentioned in fix for defect 919 ( > > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit > > > >to true. > > > >This will make kafka commit fetched records before rebalancing. > > > >Only drawback is that some records may never be processed if > > > >consumer crashes while processing records which are already > > > >marked committed due to rebalance. > > > > > > > >Regards, > > > >Vinay Sharma > > > > > >