try setting props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 40000);
default is 300000 ms On Tue, Apr 26, 2016 at 11:34 AM, Phil Luckhurst <phil.luckhu...@encycle.com > wrote: > Hi Vinay, > > Thanks for that information, it's good to know that will be fixed but I'm > not sure what would trigger this to happen in the 0.9.0.1 release? What > would cause the metadata refresh to be called while the consumer is > processing a batch of messages where I'm committing after each message and > each message is processed within session.timeout.ms=30000 ? > > I added a sleep call to my consumer so that it took 4 seconds to process > each message in a batch where my session.timeout.ms=30000. Each > commitSync(offsets) call results in the following log messages. > > 2016-04-26 16:07:08,877 DEBUG [pool-3-thread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received > successful heartbeat response. > 2016-04-26 16:07:08,878 DEBUG [pool-3-thread-2] > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed > offset 132 for partition phil-pa-1-device-update-1 > > I ran this with a batch of messages that took longer than > session.timeout.ms=30000 to be processed but I never saw a metadata > refresh. I searched through my older logs and I only seem to see the > metadata requests when the consumers first start running and from a > producer that it is in the same process. > > I guess a consumer rebalance will also trigger a metadata refresh but what > else might? > > Thanks > Phil Luckhurst > > -----Original Message----- > From: vinay sharma [mailto:vinsharma.t...@gmail.com] > Sent: 26 April 2016 13:24 > To: users@kafka.apache.org > Subject: RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1) > > Hi Phil, > > CommitSync sends a heartbeat request on each call but it seems that > somehow it stops sending a heartbeat request after a meta refresh till next > poll. I asked about this on dev list and came to know that this is fixed in > 0.10.0.0 which is next version. I heve not gone in to detail of defect but > it seems something is fixed related to time reset of hearbeat task so that > next heatbeat request time is calculated correctly. From next version > commitSync will act as heartbeat as per the defect. > > Regards, > Vinay Sharma > On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <phil.luckhu...@encycle.com> > wrote: > > > Hi Vinay, > > > > "Regarding identifying a rebalance, how about comparing array used for > > consumer pause with current assignments of consumer?" > > > > I did consider checking that in the commitSync exception handler but > > didn't try it because if this is in the consumer that has caused the > > rebalance (i.e. the one that appears to be dead) I didn't think its > > partition assignments would have been updated when handling the > > exception, the ConsumerRebalanceListener callbacks have not yet been > > called - I can give it a try though. That's why I thought having > > commitSync throw an explicit 'rebalance in progress' type exception > > rather than just a KafkaException would allow this to be easily > identified and handled. > > > > The information about the metadata request is useful, I'll watch out > > for that if we change our commit logic. > > > > Thanks > > Phil Luckhurst > > > > > > -----Original Message----- > > From: vinay sharma [mailto:vinsharma.t...@gmail.com] > > Sent: 25 April 2016 20:30 > > To: users@kafka.apache.org > > Subject: Re: Detecting rebalance while processing ConsumerRecords > > (0.9.0.1) > > > > Hi Phil, > > > > Regarding identifying a rebalance, how about comparing array used for > > consumer pause with current assignments of consumer? > > > > Regarding refresh after meta data refresh request, that will not > > happen if you are committing after each record. I have Session time of > > 30000 ms and if i commit last processed records before session time > > out then everything is fine except after a meta data refresh request i > > see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while > > committing offsets" on further commits from consumer till next poll. > > This error means that even on committing on regular intervals (which > > sends heartbeat) this somehow does not saves consumer from getting > > timeout during a meta refresh. This issue does not happen if i am > > committing after each record that is between 2-4 seconds or if a commit > happens tight after meta refresh response. > > > > Regards, > > Vinay Sharma > > > > > > On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst < > > phil.luckhu...@encycle.com > > > wrote: > > > > > Hi Vinay, > > > > > > I'm currently calling commitSync(Map<TopicPartition, > > > OffsetAndMetadata> > > > offsets) after each message to just write the partition offset for > > > that specific message. Our messages can take several seconds to > > > process and this only seems to be adding 1 or 2 milliseconds to the > > > time so is not looking like a significant overhead and is acting as > > > our > > heartbeat. > > > > > > WRT " I see that kafka sends a metadata refresh request after every > > > 300000 ms (default) and even though nothing changed (no new > > > consumer, broker or > > > partition) this refresh generally triggers a rebalance (at least in > > > my > > > tests) ." > > > > > > I'm not seeing this. We've got a ConsumerRebalanceListener > > > implemented on our consumers and I don't see this this get called > > > even though I see lots of metadata request being sent. We can also > > > have quiet periods where we often exceed the 300000 ms refresh > > > default and those metadata requests don't trigger a rebalance either. > > > > > > I'm calling consumer.pause(consumer.assignment().toArray(new > > > TopicPartition[0])) at the start of each batch and > > > consumer.resume(consumer.assignment().toArray(new > > > TopicPartition[0])) the end. This allows us to call poll(0) in the > > > message loop if we need to block on a message for more than > > > session.timeout.ms ( this can happen if an external system is > > > temporarily unavailable). Again this seems to work ok and does not > trigger a rebalance. > > > > > > The only issue we've found is as mentioned before where a rebalance > > > occurs while we are processing a batch of messages. When that > > > happens the commitSync fails with a KafkaException and the message > > > states this is due to a rebalance. We'd like to skip the rest of the > > > batch when this happens but to do that we'd need to know for sure > > > that it was because of a rebalance but KafkaException could be > > > called for other reasons. A KafkaRebalanceException or even a method > > > we could call on the consumer would allow us to safely abort the > > > current processing loop knowing that the remaining messages would be > > > picked up by another consumer after the rebalance - that would stop us > processing duplicates. > > > > > > Thanks > > > Phil Luckhurst > > > > > > > > > -----Original Message----- > > > From: vinay sharma [mailto:vinsharma.t...@gmail.com] > > > Sent: 22 April 2016 14:24 > > > To: users@kafka.apache.org > > > Subject: Re: Detecting rebalance while processing ConsumerRecords > > > (0.9.0.1) > > > > > > Hi Phil, > > > > > > Regarding pause and resume,I have not tried this approach but i > > > think this approach may not be feasible. If your consumer no longer > > > has that partition assigned from which record being processed was > > > fetched or even if partition is assigned again to consumer somehow > > > you may still not be able to do this and see UNKNOWN_MEMBER_ID or > > > error ILLEGAL_GENERATION_ID in logs.Let me know if this approach > > > works for > > you. i will also try this then. > > > > > > Kafka never sends same record to 2 consumers in same consumer group. > > > If another consumer got the same record then it means that first > > > consumer doesn't has that particular partition assigned to it anymore. > > > Even if you want to commit from first consumer you will not be able > > > to and commit will throw exception. Even after increasing > > > sessionTimeOut a rebalance can still occur. I see that kafka sends a > > > metadata refresh request after every 300000 ms (default) and even > > > though nothing changed (no new consumer, broker or > > > partition) this refresh generally triggers a rebalance (atleast in > > > my > > > tests) . > > > > > > Calling commitSync renews session and keeps consumer alive. > > > commitSync is a blocking operation so you may not want to call it on > > > each record processing. You can try calling commitSync(Offset) just > > > before starting to process a record only if lets say 75% of > > > configured session time is elapsed. This will keep your consumer > > > alive during rare longer processing time and will also not commit > > > each record. But as i said earlier this will not guarantee that > > > rebalance will not happen. A metadata refresh or other change may > > > still trigger rebalance but if you take above approach then atleast > > > a rebalance will not occur because of session time out during a longer > processing time. > > > > > > if you maintain offsets outside of kafka and configure consumers to > > > coordinate with each other through these external offsets then you > > > can skip processing duplicate records even if kafka sends same records > twice. > > > Through these external offsets you will have to device a way to skip > > > a record if already processed by another consumer or wait if same > > > record is in process by another consumer. > > > > > > > > > Regards, > > > Vinay Sharma > > > > > > > > > > > > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst < > > > phil.luckhu...@encycle.com> > > > wrote: > > > > > > > Thanks for all the responses. Unfortunately it seems that > > > > currently there is no fool proof solution to this. It's not a > > > > problem with the stored offsets as it will happen even if I do a > > > > commitSync after each record is processed. It's the unprocessed > > > > records in the batch that get processed twice. > > > > > > > > I'm now taking the approach of trying to limit the possibility of > > > > a rebalance as much as possible by reducing the data returned by > poll. > > > > I'm also using the pause, poll, resume pattern to ensure the > > > > consumer doesn't cause a rebalance if the processing loop takes > > > > longer than session.timeout.ms. > > > > > > > > Cheers, > > > > Phil > > > > > > > > > > > > > > > > On 21 Apr 2016 16:24, at 16:24, vinay sharma > > > > <vinsharma.t...@gmail.com> > > > > wrote: > > > > >Hi, > > > > > > > > > >By design Kafka does ensure not to send same record to multiple > > > > >consumers in same consumer group. Issue is because of rebalance > > > > >while a processing is going on and records are not yet commited. > > > > >In my view there are only 2 possible solutions to it > > > > >1) As mentioned in documentation, store offsets outside of kafka > > > > >( > > > > > > > > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/c > > > > li > > > > en > > > > ts/consumer/KafkaConsumer.html > > > > ). > > > > >This is a complete solution but will definitely add extra > > > > >developement and also extra processing to each message. Problem > > > > >may still exist if at the time of a crash consumer was out of > > > > >sync from external custom offset storage and offsets stored in > kafka both. > > > > >2) As mentioned in fix for defect 919 ( > > > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit > > > > >to true. > > > > >This will make kafka commit fetched records before rebalancing. > > > > >Only drawback is that some records may never be processed if > > > > >consumer crashes while processing records which are already > > > > >marked committed due to rebalance. > > > > > > > > > >Regards, > > > > >Vinay Sharma > > > > > > > > > >