Hi Kafka Users, I am desperate to find an answer to this issue. I would like to know whether my issue is due to a Single Kafka Consumer ? Where should I look for answers for this issue? Is it something do with the Kafka Broker ? I am using Kafka version 0.11.01 version for both Kafka broker an client .
Thanks Dev Loper On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote: > Hi Kafka Streams Users, > > I have posted the same question on stackoverflow and if anybody could > point some directions it would be of great help. > > https://stackoverflow.com/questions/51214506/kafka- > consumer-hung-on-certain-partitions-single-kafka-consumer > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote: > >> Hi Kafka Streams Users, >> >> My test environment, I have three Kafka brokers and my topic is having >> 16 partitions, there are 16 IOT Devices which are posting the messages to >> Kafka. I have a single system with one Kafka Consumer which subscribes to >> this topic. Each IOT devices are posting the message to Kafka every second >> and its distributed uniformly. I am >> printing the offset and partition to which the data posted using Kafka >> Producer Call back method on these each IOT device . My consumer stops >> consuming the messages from certain partitions randomly and at the same >> time its processing records from other partitions. I actually verified the >> IOT device logs and I could see that the data is actually getting posted to >> the particular partitions where the consumer has stopped consuming and I am >> able to see the offsets getting incremented for those partitions. There are >> no exceptions or any error of any kind in the consumer except that I don't >> see any processing logs for the partitions which stopped processing. >> >> >> Below I have given my pseudo code for my consumer which almost resembles >> the code which I am using in my application . >> >> public class MyKafkaConumer extends Thread { >> >> private static final AtomicBoolean running= new AtomicBoolean(true); >> private static final KafkaConsumer consumer; >> public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); >> >> static { >> >> Properties props = new Properties(); >> props.put("bootstrap.servers", "kafkaServer101:9092,kafkaServ >> er102:9092,kafkaServer103:9092"); >> props.put("group.id", "mykafa-group"); >> props.put("enable.auto.commit", "true"); >> props.put("auto.commit.interval.ms", "1000"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> IOTDeserializer.class.getName()); >> consumer = new KafkaConsumer(props); >> consumer.subscribe(Arrays.asList("mytopic")); >> >> } >> >> private MyKafkaConumer() { >> super("MyKafkaConumer"); >> } >> >> public void run() { >> >> >> try { >> while (running.get()) { >> ConsumerRecords records = consumer.poll(2000L); >> records.forEach(record -> { >> System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", >> record.key(), record.value(), >> record.partition(), record.offset()); >> }); >> >> } >> } finally { >> consumer.close(); >> } >> >> } >> >> public static void main(String[] args) throws InterruptedException { >> MyKafkaConumer.INSTANCE.start(); >> MyKafkaConumer.INSTANCE.join(); >> } >> } >> >> I only have a single Consumer with a single thread running . What could >> be the reason for the Kafka Consumer to stop processing from certain >> partitions while the processing is happening for other partitions even >> though the producer is sending message to the partitions where it was >> stuck ? Any help here is very much appreciated. >> >> Thanks >> Dev Loper >> >> >> >> >> >