Hi,

I am kind of new to Kafka. I have set up a 3 node kafka (1 broker per
machine) cluster with 3 node zookeer cluster. I am using Kafka 0.9.0.0
version.

The set up works fine wherein from my single producer I am pushing a JSON
string to Kafka to a topic with 3 partitions and replication factor of 2.
At consumer end I have application with 3 consumer threads (I suppose each
consumer thread will read from corresponding dedicated partition). The
consumer reads the JSON and persist the same in DB in a separate thread.
Following are consumer properties:

topic=TestTopic2807
bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092
topic.consumer.threads=3
group.id=EOTG
client.id=EOTG
enable.auto.commit=true
auto.commit.interval.ms=10000
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


The consumer thread routine is as follows: Each consumer runs following in
it's own thread and spawns a new thread for DB operation (I know DB
operation failure can be issue but will fix that sooner)

 ConsumerRecords<String, String> records = consumer.poll(20);
               if(!records.isEmpty()) {
                   for (ConsumerRecord<String, String> record : records) {

                    String eOCJSONString = record.value();

                    logger.info("Received the records at consumer id:" +
consumerId +
                    ". Record topic:" + record.topic() +
                    ". Record partition:" + record.partition() +
                    ". Record offset id:" + record.offset());
                    logger.info("\n Record:" + eOCJSONString);

                    if (emailOCJSONString.startsWith("{")) {
                    OCBean ocBean = gson.fromJson(record.value(),
EOCBean.class);
                    executorServiceWorker.submit(new OCWorker(ocBean,
consumerId));
                    :
}

The problem occurs when I load test the application sending 30k of messages
(JSONS) from single producer and when I tried bringing down one of the
broker while consumer is consuming the messages. I could observe that many
of the messages are processed duplicate (~200-800). I repeated this
experiment a few times and always noticed that there are many messages
which are read duplicate by consumer thread. I tried by bringing one, two
brokers down.

Is it normal to happen?
Should I switch to manual offset commit than enabling auto commit?
Or should I manually assign the partition in program rather than let
brokers manage it?

Am I missing something very important here?

Also,
I observed that Kafka-Python had similar bug and has been fixed it in 0.9.2
(https://github.com/dpkp/kafka-python/issues/189), but I believe no such
issue reported for Java.

Thanks,

Reply via email to