Note that Kafka is not designed to prevent duplicate records anyway. For example, if your app writes into an external system (for example a database) once per consumer record, and you do synchronous offset commit after every consumer record, you can still have duplicate messages. Here's the case worked through:
Usually your consumer will be going through a loop of: Iterate over message from poll() Write to external system Commit offset of message If a crash of the consumer or a rebalance happens between the write to the external system and the offset request, you're always going to receive duplicate writes to your external system, no matter what. The only real solution is to make your writes to your external system accepting of duplicates (aka they have to be idempotent). On Thu, Apr 21, 2016 at 3:49 PM, vinay sharma <vinsharma.t...@gmail.com> wrote: > regarding pause and resume approach, I think there will still be a chance > that you end up processing duplicate records. Rebalance can still get > triggered due to numerous reasons while you are processing records. > > On Thu, Apr 21, 2016 at 10:34 AM, vinay sharma <vinsharma.t...@gmail.com> > wrote: > > > I was also struggling with this problem. I have found one way to do it > > without making consumers aware of each others processing or assignment > > state. You can set autocommit to true. Irrespective of autocommit > interval > > setting autocommit true will make kafka commit all records already sent > to > > consumers. > > > > On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst < > > phil.luckhu...@encycle.com> wrote: > > > >> This is an example of the scenario I'm trying avoid where 2 consumers > >> end up processing the same records from a partition at the same time. > >> > >> > >> 1. I have a topic with 2 partitions and two consumers A and B > which > >> have each been assigned a partition from the topic. > >> > >> 2. Consumer A is processing a batch of 100 records from partition > 1 > >> and it takes much longer than expected so after 20 records the consumer > >> appears dead so a rebalance occurs. > >> > >> 3. Consumer B now gets assigned both partitions and starts > >> processing partition 1 from the last committed offset by consumer A. > >> > >> 4. Consumer A which, unknown to it, was the cause of the rebalance > >> is still processing the remaining records in its batch of 100 which are > the > >> same records that Consumer B is also now processing from the same > partition. > >> > >> Is there a way that I can detect in the ConsumerRecords loop of Consumer > >> A that it has been marked dead and should skip the remaining records in > the > >> batch and do the next poll() which will cause it to trigger another > >> rebalance and rejoin the group? I have added a ConsumerRebalanceListener > >> but as onPartitionsRevoked() only gets called when poll() is called I > don't > >> get notified until all the records from the current poll batch have been > >> processed. > >> > >> This shows where I think I need the check. > >> > >> while (true) { > >> ConsumerRecords<String, String> records = > >> consumer.poll(100); > >> records.forEach((ConsumerRecord<String, String> crs) -> > { > >> // I think I need a check here to jump > >> out the loop and call poll() again? > >> If (consumer.isDead()) > >> continue; > >> > >> } > >> } > >> > >> I've looked at the suggestions for using pause and resume so that poll() > >> can be called during the record processing loop which might allow me to > do > >> it like this? > >> > >> > >> 1. Call pause at the start of the loop. > >> > >> 2. Call poll(0) in the loop which will trigger the call to > >> ConsumerRebalanceListener onPartitionsRevoked() and perform the > rebalance. > >> > >> 3. If ConsumerRebalanceListener onPartitionsRevoked() was called > >> then I would call resume and break out of the record processing loop so > the > >> main poll() request is called again. > >> > >> 4. Call resume at the end of the record processing loop. > >> > >> Is that a viable solution to the problem or is there a better way to do > >> this? > >> > >> Thanks > >> Phil Luckhurst > >> > > > > >