Remember reading about these options for higher consumer guarantees:
Unclean.leader.election = false
Auto.offset.commit = false    consumer side
Commit after processing    syncCommit() regularly

What about your producer, does it wait until it reaches all replicas in
ISR, i.e., ack=all or none? Not sure, if this can cause consumer to read
duplicates, I know there can definitely be data loss because of data not
being replicated.

On Mon, Aug 1, 2016 at 10:11 AM, Amit K <amitk....@gmail.com> wrote:

> Hi,
>
> I am kind of new to Kafka. I have set up a 3 node kafka (1 broker per
> machine) cluster with 3 node zookeer cluster. I am using Kafka 0.9.0.0
> version.
>
> The set up works fine wherein from my single producer I am pushing a JSON
> string to Kafka to a topic with 3 partitions and replication factor of 2.
> At consumer end I have application with 3 consumer threads (I suppose each
> consumer thread will read from corresponding dedicated partition). The
> consumer reads the JSON and persist the same in DB in a separate thread.
> Following are consumer properties:
>
> topic=TestTopic2807
> bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092
> topic.consumer.threads=3
> group.id=EOTG
> client.id=EOTG
> enable.auto.commit=true
> auto.commit.interval.ms=10000
> session.timeout.ms=30000
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
> value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
>
>
> The consumer thread routine is as follows: Each consumer runs following in
> it's own thread and spawns a new thread for DB operation (I know DB
> operation failure can be issue but will fix that sooner)
>
>  ConsumerRecords<String, String> records = consumer.poll(20);
>                if(!records.isEmpty()) {
>                    for (ConsumerRecord<String, String> record : records) {
>
>                     String eOCJSONString = record.value();
>
>                     logger.info("Received the records at consumer id:" +
> consumerId +
>                     ". Record topic:" + record.topic() +
>                     ". Record partition:" + record.partition() +
>                     ". Record offset id:" + record.offset());
>                     logger.info("\n Record:" + eOCJSONString);
>
>                     if (emailOCJSONString.startsWith("{")) {
>                     OCBean ocBean = gson.fromJson(record.value(),
> EOCBean.class);
>                     executorServiceWorker.submit(new OCWorker(ocBean,
> consumerId));
>                     :
> }
>
> The problem occurs when I load test the application sending 30k of messages
> (JSONS) from single producer and when I tried bringing down one of the
> broker while consumer is consuming the messages. I could observe that many
> of the messages are processed duplicate (~200-800). I repeated this
> experiment a few times and always noticed that there are many messages
> which are read duplicate by consumer thread. I tried by bringing one, two
> brokers down.
>
> Is it normal to happen?
> Should I switch to manual offset commit than enabling auto commit?
> Or should I manually assign the partition in program rather than let
> brokers manage it?
>
> Am I missing something very important here?
>
> Also,
> I observed that Kafka-Python had similar bug and has been fixed it in 0.9.2
> (https://github.com/dpkp/kafka-python/issues/189), but I believe no such
> issue reported for Java.
>
> Thanks,
>



-- 
Radha Krishna, Proddaturi
253-234-5657

Reply via email to