Hi Phil, Regarding pause and resume,I have not tried this approach but i think this approach may not be feasible. If your consumer no longer has that partition assigned from which record being processed was fetched or even if partition is assigned again to consumer somehow you may still not be able to do this and see UNKNOWN_MEMBER_ID or error ILLEGAL_GENERATION_ID in logs.Let me know if this approach works for you. i will also try this then.
Kafka never sends same record to 2 consumers in same consumer group. If another consumer got the same record then it means that first consumer doesn't has that particular partition assigned to it anymore. Even if you want to commit from first consumer you will not be able to and commit will throw exception. Even after increasing sessionTimeOut a rebalance can still occur. I see that kafka sends a metadata refresh request after every 300000 ms (default) and even though nothing changed (no new consumer, broker or partition) this refresh generally triggers a rebalance (atleast in my tests) . Calling commitSync renews session and keeps consumer alive. commitSync is a blocking operation so you may not want to call it on each record processing. You can try calling commitSync(Offset) just before starting to process a record only if lets say 75% of configured session time is elapsed. This will keep your consumer alive during rare longer processing time and will also not commit each record. But as i said earlier this will not guarantee that rebalance will not happen. A metadata refresh or other change may still trigger rebalance but if you take above approach then atleast a rebalance will not occur because of session time out during a longer processing time. if you maintain offsets outside of kafka and configure consumers to coordinate with each other through these external offsets then you can skip processing duplicate records even if kafka sends same records twice. Through these external offsets you will have to device a way to skip a record if already processed by another consumer or wait if same record is in process by another consumer. Regards, Vinay Sharma On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <phil.luckhu...@encycle.com> wrote: > Thanks for all the responses. Unfortunately it seems that currently there > is no fool proof solution to this. It's not a problem with the stored > offsets as it will happen even if I do a commitSync after each record is > processed. It's the unprocessed records in the batch that get processed > twice. > > I'm now taking the approach of trying to limit the possibility of a > rebalance as much as possible by reducing the data returned by poll. I'm > also using the pause, poll, resume pattern to ensure the consumer doesn't > cause a rebalance if the processing loop takes longer than > session.timeout.ms. > > Cheers, > Phil > > > > On 21 Apr 2016 16:24, at 16:24, vinay sharma <vinsharma.t...@gmail.com> > wrote: > >Hi, > > > >By design Kafka does ensure not to send same record to multiple > >consumers > >in same consumer group. Issue is because of rebalance while a > >processing is > >going on and records are not yet commited. In my view there are only 2 > >possible solutions to it > >1) As mentioned in documentation, store offsets outside of kafka ( > > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > ). > >This is a complete solution but will definitely add extra developement > >and > >also extra processing to each message. Problem may still exist if at > >the > >time of a crash consumer was out of sync from external custom offset > >storage and offsets stored in kafka both. > >2) As mentioned in fix for defect 919 ( > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to > >true. > >This will make kafka commit fetched records before rebalancing. Only > >drawback is that some records may never be processed if consumer > >crashes > >while processing records which are already marked committed due to > >rebalance. > > > >Regards, > >Vinay Sharma >