Remember reading about these options for higher consumer guarantees: Unclean.leader.election = false Auto.offset.commit = false consumer side Commit after processing syncCommit() regularly
What about your producer, does it wait until it reaches all replicas in ISR, i.e., ack=all or none? Not sure, if this can cause consumer to read duplicates, I know there can definitely be data loss because of data not being replicated. On Mon, Aug 1, 2016 at 10:11 AM, Amit K <amitk....@gmail.com> wrote: > Hi, > > I am kind of new to Kafka. I have set up a 3 node kafka (1 broker per > machine) cluster with 3 node zookeer cluster. I am using Kafka 0.9.0.0 > version. > > The set up works fine wherein from my single producer I am pushing a JSON > string to Kafka to a topic with 3 partitions and replication factor of 2. > At consumer end I have application with 3 consumer threads (I suppose each > consumer thread will read from corresponding dedicated partition). The > consumer reads the JSON and persist the same in DB in a separate thread. > Following are consumer properties: > > topic=TestTopic2807 > bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092 > topic.consumer.threads=3 > group.id=EOTG > client.id=EOTG > enable.auto.commit=true > auto.commit.interval.ms=10000 > session.timeout.ms=30000 > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer > value.deserializer=org.apache.kafka.common.serialization.StringDeserializer > > > The consumer thread routine is as follows: Each consumer runs following in > it's own thread and spawns a new thread for DB operation (I know DB > operation failure can be issue but will fix that sooner) > > ConsumerRecords<String, String> records = consumer.poll(20); > if(!records.isEmpty()) { > for (ConsumerRecord<String, String> record : records) { > > String eOCJSONString = record.value(); > > logger.info("Received the records at consumer id:" + > consumerId + > ". Record topic:" + record.topic() + > ". Record partition:" + record.partition() + > ". Record offset id:" + record.offset()); > logger.info("\n Record:" + eOCJSONString); > > if (emailOCJSONString.startsWith("{")) { > OCBean ocBean = gson.fromJson(record.value(), > EOCBean.class); > executorServiceWorker.submit(new OCWorker(ocBean, > consumerId)); > : > } > > The problem occurs when I load test the application sending 30k of messages > (JSONS) from single producer and when I tried bringing down one of the > broker while consumer is consuming the messages. I could observe that many > of the messages are processed duplicate (~200-800). I repeated this > experiment a few times and always noticed that there are many messages > which are read duplicate by consumer thread. I tried by bringing one, two > brokers down. > > Is it normal to happen? > Should I switch to manual offset commit than enabling auto commit? > Or should I manually assign the partition in program rather than let > brokers manage it? > > Am I missing something very important here? > > Also, > I observed that Kafka-Python had similar bug and has been fixed it in 0.9.2 > (https://github.com/dpkp/kafka-python/issues/189), but I believe no such > issue reported for Java. > > Thanks, > -- Radha Krishna, Proddaturi 253-234-5657