try setting

props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 40000);

default is 300000 ms

On Tue, Apr 26, 2016 at 11:34 AM, Phil Luckhurst <phil.luckhu...@encycle.com
> wrote:

> Hi Vinay,
>
> Thanks for that information, it's good to know that will be fixed but I'm
> not sure what would trigger this to happen in the 0.9.0.1 release? What
> would cause the metadata refresh to be called while the consumer is
> processing a batch of messages where I'm committing after each message and
> each message is processed within session.timeout.ms=30000 ?
>
> I added a sleep call to my consumer so that it took 4 seconds to process
> each message in a batch where my session.timeout.ms=30000.   Each
> commitSync(offsets) call results in the following log messages.
>
> 2016-04-26 16:07:08,877 DEBUG [pool-3-thread-2]
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received
> successful heartbeat response.
> 2016-04-26 16:07:08,878 DEBUG [pool-3-thread-2]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed
> offset 132 for partition phil-pa-1-device-update-1
>
> I ran this with a batch of messages that took longer than
> session.timeout.ms=30000 to be processed but I never saw a metadata
> refresh. I searched through my older logs and I only seem to see the
> metadata requests when the consumers first start running and from a
> producer that it is in the same process.
>
> I guess a consumer rebalance will also trigger a metadata refresh but what
> else might?
>
> Thanks
> Phil Luckhurst
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.t...@gmail.com]
> Sent: 26 April 2016 13:24
> To: users@kafka.apache.org
> Subject: RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> CommitSync sends a heartbeat request on each call but it seems that
> somehow it stops sending a heartbeat request after a meta refresh till next
> poll. I asked about this on dev list and came to know that this is fixed in
> 0.10.0.0 which is next version. I heve not gone in to detail of defect but
> it seems something is fixed related to time reset of hearbeat task so that
> next heatbeat request time is calculated correctly. From next version
> commitSync will act as heartbeat as per the defect.
>
> Regards,
> Vinay Sharma
> On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <phil.luckhu...@encycle.com>
> wrote:
>
> > Hi Vinay,
> >
> > "Regarding identifying a rebalance, how about comparing array used for
> > consumer pause with current assignments of consumer?"
> >
> > I did consider checking that in the commitSync exception handler but
> > didn't try it because if this is in the consumer that has caused the
> > rebalance (i.e. the one that appears to be dead) I didn't think its
> > partition assignments would have been updated when handling the
> > exception, the ConsumerRebalanceListener callbacks have not yet been
> > called - I can give it a try though. That's why I thought having
> > commitSync throw an explicit 'rebalance in progress' type exception
> > rather than just a KafkaException would allow this to be easily
> identified and handled.
> >
> > The information about the metadata request is useful, I'll watch out
> > for that if we change our commit logic.
> >
> > Thanks
> > Phil Luckhurst
> >
> >
> > -----Original Message-----
> > From: vinay sharma [mailto:vinsharma.t...@gmail.com]
> > Sent: 25 April 2016 20:30
> > To: users@kafka.apache.org
> > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > (0.9.0.1)
> >
> > Hi Phil,
> >
> > Regarding identifying a rebalance, how about comparing array used for
> > consumer pause with current assignments of consumer?
> >
> > Regarding refresh after meta data refresh request, that will not
> > happen if you are committing after each record. I have Session time of
> > 30000 ms and if i commit last processed records before session time
> > out then everything is fine except after a meta data refresh request i
> > see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while
> > committing offsets"  on further commits from consumer till next poll.
> > This error means that even on committing on regular intervals (which
> > sends heartbeat) this somehow does not saves consumer from getting
> > timeout during a meta refresh. This issue does not happen if i am
> > committing after each record that is between 2-4 seconds or if a commit
> happens tight after meta refresh response.
> >
> > Regards,
> > Vinay Sharma
> >
> >
> > On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <
> > phil.luckhu...@encycle.com
> > > wrote:
> >
> > > Hi Vinay,
> > >
> > > I'm currently calling commitSync(Map<TopicPartition,
> > > OffsetAndMetadata>
> > > offsets) after each message to just write the partition offset for
> > > that specific message.  Our messages can take several seconds to
> > > process and this only seems to be adding 1 or 2 milliseconds to the
> > > time so is not looking like a significant overhead and is acting as
> > > our
> > heartbeat.
> > >
> > > WRT " I see that kafka sends a metadata refresh request after every
> > > 300000 ms (default) and even though nothing changed (no new
> > > consumer, broker or
> > > partition) this refresh generally triggers a rebalance  (at least in
> > > my
> > > tests) ."
> > >
> > > I'm not seeing this. We've got a ConsumerRebalanceListener
> > > implemented on our consumers and I don't see this this get called
> > > even though I see lots of metadata request being sent. We can also
> > > have quiet periods where we often exceed the 300000 ms refresh
> > > default and those metadata requests don't trigger a rebalance either.
> > >
> > > I'm calling consumer.pause(consumer.assignment().toArray(new
> > > TopicPartition[0])) at the start of each batch and
> > > consumer.resume(consumer.assignment().toArray(new
> > > TopicPartition[0])) the end. This allows us to call poll(0) in the
> > > message loop if we need to block on a message for more than
> > > session.timeout.ms ( this can happen if an external system is
> > > temporarily unavailable). Again this seems to work ok and does not
> trigger a rebalance.
> > >
> > > The only issue we've found is as mentioned before where a rebalance
> > > occurs while we are processing a batch of messages. When that
> > > happens the commitSync fails with a KafkaException and the message
> > > states this is due to a rebalance. We'd like to skip the rest of the
> > > batch when this happens but to do that we'd need to know for sure
> > > that it was because of a rebalance but KafkaException could be
> > > called for other reasons. A KafkaRebalanceException or even a method
> > > we could call on the consumer would allow us to safely abort the
> > > current processing loop knowing that the remaining messages would be
> > > picked up by another consumer after the rebalance - that would stop us
> processing duplicates.
> > >
> > > Thanks
> > > Phil Luckhurst
> > >
> > >
> > > -----Original Message-----
> > > From: vinay sharma [mailto:vinsharma.t...@gmail.com]
> > > Sent: 22 April 2016 14:24
> > > To: users@kafka.apache.org
> > > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > > (0.9.0.1)
> > >
> > > Hi Phil,
> > >
> > > Regarding pause and resume,I have not tried this approach but i
> > > think this approach may not be feasible. If your consumer no longer
> > > has that partition assigned from which record being processed was
> > > fetched or even if partition is assigned again to consumer somehow
> > > you may still not be able to do this and see UNKNOWN_MEMBER_ID or
> > > error ILLEGAL_GENERATION_ID in logs.Let me know if this approach
> > > works for
> > you. i will also try this then.
> > >
> > > Kafka never sends same record to 2 consumers in same consumer group.
> > > If another consumer got the same record then it means that first
> > > consumer doesn't has that particular partition assigned to it anymore.
> > > Even if you want to commit from first consumer you will not be able
> > > to and commit will throw exception. Even after increasing
> > > sessionTimeOut a rebalance can still occur. I see that kafka sends a
> > > metadata refresh request after every 300000 ms (default) and even
> > > though nothing changed (no new consumer, broker or
> > > partition) this refresh generally triggers a rebalance  (atleast in
> > > my
> > > tests) .
> > >
> > > Calling commitSync renews session and keeps consumer alive.
> > > commitSync is a blocking operation so you may not want to call it on
> > > each record processing. You can try calling commitSync(Offset) just
> > > before starting to process a record only if lets say 75% of
> > > configured session time is elapsed. This will keep your consumer
> > > alive during rare longer processing time and will also not commit
> > > each record. But as i said earlier this will not guarantee that
> > > rebalance will not happen. A metadata refresh or other change may
> > > still trigger rebalance but if you take above approach then atleast
> > > a rebalance will not occur because of session time out during a longer
> processing time.
> > >
> > > if you maintain offsets outside of kafka and configure consumers to
> > > coordinate with each other through these external offsets then you
> > > can skip processing duplicate records even if kafka sends same records
> twice.
> > > Through these external offsets you will have to device a way to skip
> > > a record if already processed by another consumer or wait if same
> > > record is in process by another consumer.
> > >
> > >
> > > Regards,
> > > Vinay Sharma
> > >
> > >
> > >
> > > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <
> > > phil.luckhu...@encycle.com>
> > > wrote:
> > >
> > > > Thanks for all the responses. Unfortunately it seems that
> > > > currently there is no fool proof solution to this. It's not a
> > > > problem with the stored offsets as it will happen even if I do a
> > > > commitSync after each record is processed. It's the unprocessed
> > > > records in the batch that get processed twice.
> > > >
> > > > I'm now taking the approach of trying to limit the possibility of
> > > > a rebalance as much as possible by reducing the data returned by
> poll.
> > > > I'm also using the pause, poll, resume pattern to ensure the
> > > > consumer doesn't cause a rebalance if the processing loop takes
> > > > longer than session.timeout.ms.
> > > >
> > > > Cheers,
> > > > Phil
> > > >
> > > >
> > > >
> > > > On 21 Apr 2016 16:24, at 16:24, vinay sharma
> > > > <vinsharma.t...@gmail.com>
> > > > wrote:
> > > > >Hi,
> > > > >
> > > > >By design Kafka does ensure not to send same record to multiple
> > > > >consumers in same consumer group. Issue is because of rebalance
> > > > >while a processing is going on and records are not yet commited.
> > > > >In my view there are only 2 possible solutions to it
> > > > >1) As mentioned in documentation, store offsets outside of kafka
> > > > >(
> > > > >
> > > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/c
> > > > li
> > > > en
> > > > ts/consumer/KafkaConsumer.html
> > > > ).
> > > > >This is a complete solution but will definitely add extra
> > > > >developement and also extra processing to each message. Problem
> > > > >may still exist if at the time of a crash consumer was out of
> > > > >sync from external custom offset storage and offsets stored in
> kafka both.
> > > > >2)  As mentioned in fix for defect 919 (
> > > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit
> > > > >to true.
> > > > >This will make kafka commit fetched records before rebalancing.
> > > > >Only drawback is that some records may never be processed if
> > > > >consumer crashes while processing records which are already
> > > > >marked committed due to rebalance.
> > > > >
> > > > >Regards,
> > > > >Vinay Sharma
> > > >
> > >
> >
>

Reply via email to