Hi Kafka Streams Users, I have posted the same question on stackoverflow and if anybody could point some directions it would be of great help.
https://stackoverflow.com/questions/51214506/kafka-consumer-hung-on-certain-partitions-single-kafka-consumer On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote: > Hi Kafka Streams Users, > > My test environment, I have three Kafka brokers and my topic is having > 16 partitions, there are 16 IOT Devices which are posting the messages to > Kafka. I have a single system with one Kafka Consumer which subscribes to > this topic. Each IOT devices are posting the message to Kafka every second > and its distributed uniformly. I am > printing the offset and partition to which the data posted using Kafka > Producer Call back method on these each IOT device . My consumer stops > consuming the messages from certain partitions randomly and at the same > time its processing records from other partitions. I actually verified the > IOT device logs and I could see that the data is actually getting posted to > the particular partitions where the consumer has stopped consuming and I am > able to see the offsets getting incremented for those partitions. There are > no exceptions or any error of any kind in the consumer except that I don't > see any processing logs for the partitions which stopped processing. > > > Below I have given my pseudo code for my consumer which almost resembles > the code which I am using in my application . > > public class MyKafkaConumer extends Thread { > > private static final AtomicBoolean running= new AtomicBoolean(true); > private static final KafkaConsumer consumer; > public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); > > static { > > Properties props = new Properties(); > props.put("bootstrap.servers", "kafkaServer101:9092, > kafkaServer102:9092,kafkaServer103:9092"); > props.put("group.id", "mykafa-group"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > IOTDeserializer.class.getName()); > consumer = new KafkaConsumer(props); > consumer.subscribe(Arrays.asList("mytopic")); > > } > > private MyKafkaConumer() { > super("MyKafkaConumer"); > } > > public void run() { > > > try { > while (running.get()) { > ConsumerRecords records = consumer.poll(2000L); > records.forEach(record -> { > System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", > record.key(), record.value(), > record.partition(), record.offset()); > }); > > } > } finally { > consumer.close(); > } > > } > > public static void main(String[] args) throws InterruptedException { > MyKafkaConumer.INSTANCE.start(); > MyKafkaConumer.INSTANCE.join(); > } > } > > I only have a single Consumer with a single thread running . What could be > the reason for the Kafka Consumer to stop processing from certain > partitions while the processing is happening for other partitions even > though the producer is sending message to the partitions where it was > stuck ? Any help here is very much appreciated. > > Thanks > Dev Loper > > > > >