Hi Steve, When the issue occurs I could see the lag shown for the corresponding partitions in the output of kafka-consumer-groups command . I will try changing the log level to debug for kafka consumer and I will post if anything seems to be outside the normal range
Thank you Dev Loper On Mon, Jul 9, 2018 at 12:24 PM, Steve Tian <steve.cs.t...@gmail.com> wrote: > Also, have you tried to enabled DEBUG level logging for KafkaConsumer? How > long does it take to process records in a single batch? > > On Mon, Jul 9, 2018, 2:51 PM Sabarish Sasidharan <sabarish....@gmail.com> > wrote: > > > Can you try executing kafka-consumer-groups to see what it says? Are your > > log offsets increasing and the lag increasing when processing for some of > > your partitions are stuck? > > > > May be the problem is in the producer side. > > > > Regards > > Sab > > > > > > On Mon, 9 Jul 2018, 11:43 am Steve Tian, <steve.cs.t...@gmail.com> > wrote: > > > > > Are you sure your consumer was still the owner of your partitions? > > > > > > On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote: > > > > > > > Hi Kafka Users, > > > > > > > > I am desperate to find an answer to this issue. I would like to know > > > > whether my issue is due to a Single Kafka Consumer ? Where should I > > look > > > > for answers for this issue? Is it something do with the Kafka Broker > > ? I > > > > am using Kafka version 0.11.01 version for both Kafka broker an > client > > . > > > > > > > > Thanks > > > > > > > > Dev Loper > > > > > > > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> > wrote: > > > > > > > > > Hi Kafka Streams Users, > > > > > > > > > > I have posted the same question on stackoverflow and if anybody > could > > > > > point some directions it would be of great help. > > > > > > > > > > https://stackoverflow.com/questions/51214506/kafka- > > > > > consumer-hung-on-certain-partitions-single-kafka-consumer > > > > > > > > > > > > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> > > wrote: > > > > > > > > > >> Hi Kafka Streams Users, > > > > >> > > > > >> My test environment, I have three Kafka brokers and my topic is > > > having > > > > >> 16 partitions, there are 16 IOT Devices which are posting the > > messages > > > > to > > > > >> Kafka. I have a single system with one Kafka Consumer which > > subscribes > > > > to > > > > >> this topic. Each IOT devices are posting the message to Kafka > every > > > > second > > > > >> and its distributed uniformly. I am > > > > >> printing the offset and partition to which the data posted using > > Kafka > > > > >> Producer Call back method on these each IOT device . My consumer > > stops > > > > >> consuming the messages from certain partitions randomly and at the > > > same > > > > >> time its processing records from other partitions. I actually > > verified > > > > the > > > > >> IOT device logs and I could see that the data is actually getting > > > > posted to > > > > >> the particular partitions where the consumer has stopped consuming > > and > > > > I am > > > > >> able to see the offsets getting incremented for those partitions. > > > There > > > > are > > > > >> no exceptions or any error of any kind in the consumer except > that I > > > > don't > > > > >> see any processing logs for the partitions which stopped > processing. > > > > >> > > > > >> > > > > >> Below I have given my pseudo code for my consumer which almost > > > > resembles > > > > >> the code which I am using in my application . > > > > >> > > > > >> public class MyKafkaConumer extends Thread { > > > > >> > > > > >> private static final AtomicBoolean running= new > > > AtomicBoolean(true); > > > > >> private static final KafkaConsumer consumer; > > > > >> public static final MyKafkaConumer INSTANCE = new > > MyKafkaConumer(); > > > > >> > > > > >> static { > > > > >> > > > > >> Properties props = new Properties(); > > > > >> props.put("bootstrap.servers", > > > "kafkaServer101:9092,kafkaServ > > > > >> er102:9092,kafkaServer103:9092"); > > > > >> props.put("group.id", "mykafa-group"); > > > > >> props.put("enable.auto.commit", "true"); > > > > >> props.put("auto.commit.interval.ms", "1000"); > > > > >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > > >> StringDeserializer.class.getName()); > > > > >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ > CONFIG, > > > > >> IOTDeserializer.class.getName()); > > > > >> consumer = new KafkaConsumer(props); > > > > >> consumer.subscribe(Arrays.asList("mytopic")); > > > > >> > > > > >> } > > > > >> > > > > >> private MyKafkaConumer() { > > > > >> super("MyKafkaConumer"); > > > > >> } > > > > >> > > > > >> public void run() { > > > > >> > > > > >> > > > > >> try { > > > > >> while (running.get()) { > > > > >> ConsumerRecords records = consumer.poll(2000L); > > > > >> records.forEach(record -> { > > > > >> System.out.printf("Consumer Record:(%d, %s, %d, > > > %d)\n", > > > > >> record.key(), record.value(), > > > > >> record.partition(), record.offset()); > > > > >> }); > > > > >> > > > > >> } > > > > >> } finally { > > > > >> consumer.close(); > > > > >> } > > > > >> > > > > >> } > > > > >> > > > > >> public static void main(String[] args) throws > > InterruptedException > > > { > > > > >> MyKafkaConumer.INSTANCE.start(); > > > > >> MyKafkaConumer.INSTANCE.join(); > > > > >> } > > > > >> } > > > > >> > > > > >> I only have a single Consumer with a single thread running . What > > > could > > > > >> be the reason for the Kafka Consumer to stop processing from > certain > > > > >> partitions while the processing is happening for other partitions > > even > > > > >> though the producer is sending message to the partitions where it > > was > > > > >> stuck ? Any help here is very much appreciated. > > > > >> > > > > >> Thanks > > > > >> Dev Loper > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > > > > > > > > > > > >