Hi Kafka Streams Users, My test environment, I have three Kafka brokers and my topic is having 16 partitions, there are 16 IOT Devices which are posting the messages to Kafka. I have a single system with one Kafka Consumer which subscribes to this topic. Each IOT devices are posting the message to Kafka every second and its distributed uniformly. I am printing the offset and partition to which the data posted using Kafka Producer Call back method on these each IOT device . My consumer stops consuming the messages from certain partitions randomly and at the same time its processing records from other partitions. I actually verified the IOT device logs and I could see that the data is actually getting posted to the particular partitions where the consumer has stopped consuming and I am able to see the offsets getting incremented for those partitions. There are no exceptions or any error of any kind in the consumer except that I don't see any processing logs for the partitions which stopped processing.
Below I have given my pseudo code for my consumer which almost resembles the code which I am using in my application . public class MyKafkaConumer extends Thread { private static final AtomicBoolean running= new AtomicBoolean(true); private static final KafkaConsumer consumer; public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); static { Properties props = new Properties(); props.put("bootstrap.servers", "kafkaServer101:9092,kafkaServer102:9092,kafkaServer103:9092"); props.put("group.id", "mykafa-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IOTDeserializer.class.getName()); consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("mytopic")); } private MyKafkaConumer() { super("MyKafkaConumer"); } public void run() { try { while (running.get()) { ConsumerRecords records = consumer.poll(2000L); records.forEach(record -> { System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); }); } } finally { consumer.close(); } } public static void main(String[] args) throws InterruptedException { MyKafkaConumer.INSTANCE.start(); MyKafkaConumer.INSTANCE.join(); } } I only have a single Consumer with a single thread running . What could be the reason for the Kafka Consumer to stop processing from certain partitions while the processing is happening for other partitions even though the producer is sending message to the partitions where it was stuck ? Any help here is very much appreciated. Thanks Dev Loper