Thanks for your reply.

I see the duplicates when I bring down and up a broker when load testing is
in progress. If I keep it down for whole test, everything is fine.

I will try modes as you mentioned earlier and now and monitor the
performance.

On Tue, Aug 2, 2016 at 12:57 PM, R Krishna <krishna...@gmail.com> wrote:

> Sure, rebalance is a normal cause for duplicates.
> Sure, "As I lower value of auto.commit.interval.ms, the performance
> deteriorates
> drastically" but you should see less duplicates. Did you try commit async
> or storing offsets somewhere else?
> On Aug 1, 2016 10:59 PM, "Amit K" <amitk....@gmail.com> wrote:
>
> > Thanks for reply,
> >
> > On producer side, I have ACK as all, with 3 retries, rest all are mostly
> > default properties.
> >
> > With replication factor of 2, I believe the messages from partition of
> > downed broker will be read by other one but I doubt if that would lead to
> > duplicate reading to such a high extent which I observed (~200-800). More
> > over this is not that consistent, sometime the count goes up and some
> time
> > down.
> >
> > I think when re-balance happens, and consumers start reading from
> committed
> > offset is when the duplicates get in.
> >
> > One thing I observed is when I have following properties:
> >
> > enable.auto.commit=true
> > auto.commit.interval.ms=10000
> > session.timeout.ms=30000
> >
> > I get most optimal performance (with much less number of duplicates). As
> I
> > lower value of auto.commit.interval.ms, the performance deteriorates
> > drastically.
> >
> > What may be I need to try, please correct me if I have got it wrong
> > completely, is to try async commit mode and see how it performs.
> >
> > Also, as I mentioned there was a bug reported of same kind with
> > kafka-python, can it be same here with kafka-java?
> >
> > Thanks,
> >
> > On Tue, Aug 2, 2016 at 3:46 AM, R Krishna <krishna...@gmail.com> wrote:
> >
> > > What about failed async commits in this case due to downed broker? Can
> it
> > > not cause consumer to read it again as offsets may not be successfully
> > > updated?
> > >
> > > On Mon, Aug 1, 2016 at 11:35 AM, Tauzell, Dave <
> > > dave.tauz...@surescripts.com
> > > > wrote:
> > >
> > > > If you kill a broker, then any uncommitted messages will be replayed.
> > > >
> > > > -Dave
> > > > ________________________________________
> > > > From: R Krishna <krishna...@gmail.com>
> > > > Sent: Monday, August 1, 2016 1:32 PM
> > > > To: users@kafka.apache.org
> > > > Subject: Re: Kafka java consumer processes duplicate messages
> > > >
> > > > Remember reading about these options for higher consumer guarantees:
> > > > Unclean.leader.election = false
> > > > Auto.offset.commit = false    consumer side
> > > > Commit after processing    syncCommit() regularly
> > > >
> > > > What about your producer, does it wait until it reaches all replicas
> in
> > > > ISR, i.e., ack=all or none? Not sure, if this can cause consumer to
> > read
> > > > duplicates, I know there can definitely be data loss because of data
> > not
> > > > being replicated.
> > > >
> > > > On Mon, Aug 1, 2016 at 10:11 AM, Amit K <amitk....@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I am kind of new to Kafka. I have set up a 3 node kafka (1 broker
> per
> > > > > machine) cluster with 3 node zookeer cluster. I am using Kafka
> > 0.9.0.0
> > > > > version.
> > > > >
> > > > > The set up works fine wherein from my single producer I am pushing
> a
> > > JSON
> > > > > string to Kafka to a topic with 3 partitions and replication factor
> > of
> > > 2.
> > > > > At consumer end I have application with 3 consumer threads (I
> suppose
> > > > each
> > > > > consumer thread will read from corresponding dedicated partition).
> > The
> > > > > consumer reads the JSON and persist the same in DB in a separate
> > > thread.
> > > > > Following are consumer properties:
> > > > >
> > > > > topic=TestTopic2807
> > > > > bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092
> > > > > topic.consumer.threads=3
> > > > > group.id=EOTG
> > > > > client.id=EOTG
> > > > > enable.auto.commit=true
> > > > > auto.commit.interval.ms=10000
> > > > > session.timeout.ms=30000
> > > > >
> > >
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> > > > >
> > > >
> > >
> >
> value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> > > > >
> > > > >
> > > > > The consumer thread routine is as follows: Each consumer runs
> > following
> > > > in
> > > > > it's own thread and spawns a new thread for DB operation (I know DB
> > > > > operation failure can be issue but will fix that sooner)
> > > > >
> > > > >  ConsumerRecords<String, String> records = consumer.poll(20);
> > > > >                if(!records.isEmpty()) {
> > > > >                    for (ConsumerRecord<String, String> record :
> > > records)
> > > > {
> > > > >
> > > > >                     String eOCJSONString = record.value();
> > > > >
> > > > >                     logger.info("Received the records at consumer
> > > id:" +
> > > > > consumerId +
> > > > >                     ". Record topic:" + record.topic() +
> > > > >                     ". Record partition:" + record.partition() +
> > > > >                     ". Record offset id:" + record.offset());
> > > > >                     logger.info("\n Record:" + eOCJSONString);
> > > > >
> > > > >                     if (emailOCJSONString.startsWith("{")) {
> > > > >                     OCBean ocBean = gson.fromJson(record.value(),
> > > > > EOCBean.class);
> > > > >                     executorServiceWorker.submit(new
> OCWorker(ocBean,
> > > > > consumerId));
> > > > >                     :
> > > > > }
> > > > >
> > > > > The problem occurs when I load test the application sending 30k of
> > > > messages
> > > > > (JSONS) from single producer and when I tried bringing down one of
> > the
> > > > > broker while consumer is consuming the messages. I could observe
> that
> > > > many
> > > > > of the messages are processed duplicate (~200-800). I repeated this
> > > > > experiment a few times and always noticed that there are many
> > messages
> > > > > which are read duplicate by consumer thread. I tried by bringing
> one,
> > > two
> > > > > brokers down.
> > > > >
> > > > > Is it normal to happen?
> > > > > Should I switch to manual offset commit than enabling auto commit?
> > > > > Or should I manually assign the partition in program rather than
> let
> > > > > brokers manage it?
> > > > >
> > > > > Am I missing something very important here?
> > > > >
> > > > > Also,
> > > > > I observed that Kafka-Python had similar bug and has been fixed it
> in
> > > > 0.9.2
> > > > > (https://github.com/dpkp/kafka-python/issues/189), but I believe
> no
> > > such
> > > > > issue reported for Java.
> > > > >
> > > > > Thanks,
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Radha Krishna, Proddaturi
> > > > 253-234-5657
> > > > This e-mail and any files transmitted with it are confidential, may
> > > > contain sensitive information, and are intended solely for the use of
> > the
> > > > individual or entity to whom they are addressed. If you have received
> > > this
> > > > e-mail in error, please notify the sender by reply e-mail immediately
> > and
> > > > destroy all copies of the e-mail and any attachments.
> > > >
> > >
> > >
> > >
> > > --
> > > Radha Krishna, Proddaturi
> > > 253-234-5657
> > >
> >
>

Reply via email to