Can you try executing kafka-consumer-groups to see what it says? Are your log offsets increasing and the lag increasing when processing for some of your partitions are stuck?
May be the problem is in the producer side. Regards Sab On Mon, 9 Jul 2018, 11:43 am Steve Tian, <steve.cs.t...@gmail.com> wrote: > Are you sure your consumer was still the owner of your partitions? > > On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote: > > > Hi Kafka Users, > > > > I am desperate to find an answer to this issue. I would like to know > > whether my issue is due to a Single Kafka Consumer ? Where should I look > > for answers for this issue? Is it something do with the Kafka Broker ? I > > am using Kafka version 0.11.01 version for both Kafka broker an client . > > > > Thanks > > > > Dev Loper > > > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote: > > > > > Hi Kafka Streams Users, > > > > > > I have posted the same question on stackoverflow and if anybody could > > > point some directions it would be of great help. > > > > > > https://stackoverflow.com/questions/51214506/kafka- > > > consumer-hung-on-certain-partitions-single-kafka-consumer > > > > > > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote: > > > > > >> Hi Kafka Streams Users, > > >> > > >> My test environment, I have three Kafka brokers and my topic is > having > > >> 16 partitions, there are 16 IOT Devices which are posting the messages > > to > > >> Kafka. I have a single system with one Kafka Consumer which subscribes > > to > > >> this topic. Each IOT devices are posting the message to Kafka every > > second > > >> and its distributed uniformly. I am > > >> printing the offset and partition to which the data posted using Kafka > > >> Producer Call back method on these each IOT device . My consumer stops > > >> consuming the messages from certain partitions randomly and at the > same > > >> time its processing records from other partitions. I actually verified > > the > > >> IOT device logs and I could see that the data is actually getting > > posted to > > >> the particular partitions where the consumer has stopped consuming and > > I am > > >> able to see the offsets getting incremented for those partitions. > There > > are > > >> no exceptions or any error of any kind in the consumer except that I > > don't > > >> see any processing logs for the partitions which stopped processing. > > >> > > >> > > >> Below I have given my pseudo code for my consumer which almost > > resembles > > >> the code which I am using in my application . > > >> > > >> public class MyKafkaConumer extends Thread { > > >> > > >> private static final AtomicBoolean running= new > AtomicBoolean(true); > > >> private static final KafkaConsumer consumer; > > >> public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); > > >> > > >> static { > > >> > > >> Properties props = new Properties(); > > >> props.put("bootstrap.servers", > "kafkaServer101:9092,kafkaServ > > >> er102:9092,kafkaServer103:9092"); > > >> props.put("group.id", "mykafa-group"); > > >> props.put("enable.auto.commit", "true"); > > >> props.put("auto.commit.interval.ms", "1000"); > > >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > >> StringDeserializer.class.getName()); > > >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > >> IOTDeserializer.class.getName()); > > >> consumer = new KafkaConsumer(props); > > >> consumer.subscribe(Arrays.asList("mytopic")); > > >> > > >> } > > >> > > >> private MyKafkaConumer() { > > >> super("MyKafkaConumer"); > > >> } > > >> > > >> public void run() { > > >> > > >> > > >> try { > > >> while (running.get()) { > > >> ConsumerRecords records = consumer.poll(2000L); > > >> records.forEach(record -> { > > >> System.out.printf("Consumer Record:(%d, %s, %d, > %d)\n", > > >> record.key(), record.value(), > > >> record.partition(), record.offset()); > > >> }); > > >> > > >> } > > >> } finally { > > >> consumer.close(); > > >> } > > >> > > >> } > > >> > > >> public static void main(String[] args) throws InterruptedException > { > > >> MyKafkaConumer.INSTANCE.start(); > > >> MyKafkaConumer.INSTANCE.join(); > > >> } > > >> } > > >> > > >> I only have a single Consumer with a single thread running . What > could > > >> be the reason for the Kafka Consumer to stop processing from certain > > >> partitions while the processing is happening for other partitions even > > >> though the producer is sending message to the partitions where it was > > >> stuck ? Any help here is very much appreciated. > > >> > > >> Thanks > > >> Dev Loper > > >> > > >> > > >> > > >> > > >> > > > > > >