Thanks for your reply. I see the duplicates when I bring down and up a broker when load testing is in progress. If I keep it down for whole test, everything is fine.
I will try modes as you mentioned earlier and now and monitor the performance. On Tue, Aug 2, 2016 at 12:57 PM, R Krishna <krishna...@gmail.com> wrote: > Sure, rebalance is a normal cause for duplicates. > Sure, "As I lower value of auto.commit.interval.ms, the performance > deteriorates > drastically" but you should see less duplicates. Did you try commit async > or storing offsets somewhere else? > On Aug 1, 2016 10:59 PM, "Amit K" <amitk....@gmail.com> wrote: > > > Thanks for reply, > > > > On producer side, I have ACK as all, with 3 retries, rest all are mostly > > default properties. > > > > With replication factor of 2, I believe the messages from partition of > > downed broker will be read by other one but I doubt if that would lead to > > duplicate reading to such a high extent which I observed (~200-800). More > > over this is not that consistent, sometime the count goes up and some > time > > down. > > > > I think when re-balance happens, and consumers start reading from > committed > > offset is when the duplicates get in. > > > > One thing I observed is when I have following properties: > > > > enable.auto.commit=true > > auto.commit.interval.ms=10000 > > session.timeout.ms=30000 > > > > I get most optimal performance (with much less number of duplicates). As > I > > lower value of auto.commit.interval.ms, the performance deteriorates > > drastically. > > > > What may be I need to try, please correct me if I have got it wrong > > completely, is to try async commit mode and see how it performs. > > > > Also, as I mentioned there was a bug reported of same kind with > > kafka-python, can it be same here with kafka-java? > > > > Thanks, > > > > On Tue, Aug 2, 2016 at 3:46 AM, R Krishna <krishna...@gmail.com> wrote: > > > > > What about failed async commits in this case due to downed broker? Can > it > > > not cause consumer to read it again as offsets may not be successfully > > > updated? > > > > > > On Mon, Aug 1, 2016 at 11:35 AM, Tauzell, Dave < > > > dave.tauz...@surescripts.com > > > > wrote: > > > > > > > If you kill a broker, then any uncommitted messages will be replayed. > > > > > > > > -Dave > > > > ________________________________________ > > > > From: R Krishna <krishna...@gmail.com> > > > > Sent: Monday, August 1, 2016 1:32 PM > > > > To: users@kafka.apache.org > > > > Subject: Re: Kafka java consumer processes duplicate messages > > > > > > > > Remember reading about these options for higher consumer guarantees: > > > > Unclean.leader.election = false > > > > Auto.offset.commit = false consumer side > > > > Commit after processing syncCommit() regularly > > > > > > > > What about your producer, does it wait until it reaches all replicas > in > > > > ISR, i.e., ack=all or none? Not sure, if this can cause consumer to > > read > > > > duplicates, I know there can definitely be data loss because of data > > not > > > > being replicated. > > > > > > > > On Mon, Aug 1, 2016 at 10:11 AM, Amit K <amitk....@gmail.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > I am kind of new to Kafka. I have set up a 3 node kafka (1 broker > per > > > > > machine) cluster with 3 node zookeer cluster. I am using Kafka > > 0.9.0.0 > > > > > version. > > > > > > > > > > The set up works fine wherein from my single producer I am pushing > a > > > JSON > > > > > string to Kafka to a topic with 3 partitions and replication factor > > of > > > 2. > > > > > At consumer end I have application with 3 consumer threads (I > suppose > > > > each > > > > > consumer thread will read from corresponding dedicated partition). > > The > > > > > consumer reads the JSON and persist the same in DB in a separate > > > thread. > > > > > Following are consumer properties: > > > > > > > > > > topic=TestTopic2807 > > > > > bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092 > > > > > topic.consumer.threads=3 > > > > > group.id=EOTG > > > > > client.id=EOTG > > > > > enable.auto.commit=true > > > > > auto.commit.interval.ms=10000 > > > > > session.timeout.ms=30000 > > > > > > > > > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer > > > > > > > > > > > > > > > value.deserializer=org.apache.kafka.common.serialization.StringDeserializer > > > > > > > > > > > > > > > The consumer thread routine is as follows: Each consumer runs > > following > > > > in > > > > > it's own thread and spawns a new thread for DB operation (I know DB > > > > > operation failure can be issue but will fix that sooner) > > > > > > > > > > ConsumerRecords<String, String> records = consumer.poll(20); > > > > > if(!records.isEmpty()) { > > > > > for (ConsumerRecord<String, String> record : > > > records) > > > > { > > > > > > > > > > String eOCJSONString = record.value(); > > > > > > > > > > logger.info("Received the records at consumer > > > id:" + > > > > > consumerId + > > > > > ". Record topic:" + record.topic() + > > > > > ". Record partition:" + record.partition() + > > > > > ". Record offset id:" + record.offset()); > > > > > logger.info("\n Record:" + eOCJSONString); > > > > > > > > > > if (emailOCJSONString.startsWith("{")) { > > > > > OCBean ocBean = gson.fromJson(record.value(), > > > > > EOCBean.class); > > > > > executorServiceWorker.submit(new > OCWorker(ocBean, > > > > > consumerId)); > > > > > : > > > > > } > > > > > > > > > > The problem occurs when I load test the application sending 30k of > > > > messages > > > > > (JSONS) from single producer and when I tried bringing down one of > > the > > > > > broker while consumer is consuming the messages. I could observe > that > > > > many > > > > > of the messages are processed duplicate (~200-800). I repeated this > > > > > experiment a few times and always noticed that there are many > > messages > > > > > which are read duplicate by consumer thread. I tried by bringing > one, > > > two > > > > > brokers down. > > > > > > > > > > Is it normal to happen? > > > > > Should I switch to manual offset commit than enabling auto commit? > > > > > Or should I manually assign the partition in program rather than > let > > > > > brokers manage it? > > > > > > > > > > Am I missing something very important here? > > > > > > > > > > Also, > > > > > I observed that Kafka-Python had similar bug and has been fixed it > in > > > > 0.9.2 > > > > > (https://github.com/dpkp/kafka-python/issues/189), but I believe > no > > > such > > > > > issue reported for Java. > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > -- > > > > Radha Krishna, Proddaturi > > > > 253-234-5657 > > > > This e-mail and any files transmitted with it are confidential, may > > > > contain sensitive information, and are intended solely for the use of > > the > > > > individual or entity to whom they are addressed. If you have received > > > this > > > > e-mail in error, please notify the sender by reply e-mail immediately > > and > > > > destroy all copies of the e-mail and any attachments. > > > > > > > > > > > > > > > > -- > > > Radha Krishna, Proddaturi > > > 253-234-5657 > > > > > >