Hi Kafka Streams Users,

 My test environment, I have three Kafka brokers and my topic is having 16
partitions, there are 16 IOT Devices which are posting the messages to
Kafka. I have a single system with one Kafka Consumer which subscribes to
this topic. Each IOT devices are posting  the message to Kafka every second
and its distributed uniformly. I am
printing the offset and partition to which the data posted using Kafka
Producer Call back method on these each IOT device . My consumer stops
consuming the messages from certain partitions randomly and at the same
time its processing records from other partitions. I actually verified the
IOT device logs and I could see that the data is actually getting posted to
the particular partitions where the consumer has stopped consuming and I am
able to see the offsets getting incremented for those partitions. There are
no exceptions or any error of any kind in the consumer except that I don't
see any processing logs for the partitions which stopped processing.


Below I have given my pseudo code for my consumer  which almost resembles
the code which I am using in my application .

public class MyKafkaConumer extends Thread {

   private static final AtomicBoolean running= new AtomicBoolean(true);
   private static final KafkaConsumer consumer;
   public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();

   static {

          Properties props = new Properties();
          props.put("bootstrap.servers",
"kafkaServer101:9092,kafkaServer102:9092,kafkaServer103:9092");
          props.put("group.id", "mykafa-group");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  StringDeserializer.class.getName());
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                  IOTDeserializer.class.getName());
          consumer = new KafkaConsumer(props);
          consumer.subscribe(Arrays.asList("mytopic"));

       }

     private MyKafkaConumer() {
          super("MyKafkaConumer");
    }

   public void run() {


       try {
        while (running.get()) {
            ConsumerRecords records = consumer.poll(2000L);
            records.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
                        record.partition(), record.offset());
            });

        }
    } finally {
          consumer.close();
    }

   }

   public static void main(String[] args) throws InterruptedException {
       MyKafkaConumer.INSTANCE.start();
       MyKafkaConumer.INSTANCE.join();
}
}

I only have a single Consumer with a single thread running . What could be
the reason for the Kafka Consumer to stop processing from certain partitions
while the processing is happening for other partitions even though the
producer is sending message to the partitions where it was stuck ? Any help
here is very much appreciated.

Thanks
Dev Loper

Reply via email to