The processing does not take much time since the application reads from the kafka and immediately puts into a in memory queue and another thread persist these records into DB.
On Mon, Jul 9, 2018 at 12:44 PM, dev loper <spark...@gmail.com> wrote: > Hi Steve, > > When the issue occurs I could see the lag shown for the corresponding > partitions in the output of kafka-consumer-groups command . I will try > changing the log level to debug for kafka consumer and I will post if > anything seems to be outside the normal range > > Thank you > Dev Loper > > On Mon, Jul 9, 2018 at 12:24 PM, Steve Tian <steve.cs.t...@gmail.com> > wrote: > >> Also, have you tried to enabled DEBUG level logging for KafkaConsumer? How >> long does it take to process records in a single batch? >> >> On Mon, Jul 9, 2018, 2:51 PM Sabarish Sasidharan <sabarish....@gmail.com> >> wrote: >> >> > Can you try executing kafka-consumer-groups to see what it says? Are >> your >> > log offsets increasing and the lag increasing when processing for some >> of >> > your partitions are stuck? >> > >> > May be the problem is in the producer side. >> > >> > Regards >> > Sab >> > >> > >> > On Mon, 9 Jul 2018, 11:43 am Steve Tian, <steve.cs.t...@gmail.com> >> wrote: >> > >> > > Are you sure your consumer was still the owner of your partitions? >> > > >> > > On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote: >> > > >> > > > Hi Kafka Users, >> > > > >> > > > I am desperate to find an answer to this issue. I would like to know >> > > > whether my issue is due to a Single Kafka Consumer ? Where should I >> > look >> > > > for answers for this issue? Is it something do with the Kafka >> Broker >> > ? I >> > > > am using Kafka version 0.11.01 version for both Kafka broker an >> client >> > . >> > > > >> > > > Thanks >> > > > >> > > > Dev Loper >> > > > >> > > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> >> wrote: >> > > > >> > > > > Hi Kafka Streams Users, >> > > > > >> > > > > I have posted the same question on stackoverflow and if anybody >> could >> > > > > point some directions it would be of great help. >> > > > > >> > > > > https://stackoverflow.com/questions/51214506/kafka- >> > > > > consumer-hung-on-certain-partitions-single-kafka-consumer >> > > > > >> > > > > >> > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> >> > wrote: >> > > > > >> > > > >> Hi Kafka Streams Users, >> > > > >> >> > > > >> My test environment, I have three Kafka brokers and my topic is >> > > having >> > > > >> 16 partitions, there are 16 IOT Devices which are posting the >> > messages >> > > > to >> > > > >> Kafka. I have a single system with one Kafka Consumer which >> > subscribes >> > > > to >> > > > >> this topic. Each IOT devices are posting the message to Kafka >> every >> > > > second >> > > > >> and its distributed uniformly. I am >> > > > >> printing the offset and partition to which the data posted using >> > Kafka >> > > > >> Producer Call back method on these each IOT device . My consumer >> > stops >> > > > >> consuming the messages from certain partitions randomly and at >> the >> > > same >> > > > >> time its processing records from other partitions. I actually >> > verified >> > > > the >> > > > >> IOT device logs and I could see that the data is actually getting >> > > > posted to >> > > > >> the particular partitions where the consumer has stopped >> consuming >> > and >> > > > I am >> > > > >> able to see the offsets getting incremented for those partitions. >> > > There >> > > > are >> > > > >> no exceptions or any error of any kind in the consumer except >> that I >> > > > don't >> > > > >> see any processing logs for the partitions which stopped >> processing. >> > > > >> >> > > > >> >> > > > >> Below I have given my pseudo code for my consumer which almost >> > > > resembles >> > > > >> the code which I am using in my application . >> > > > >> >> > > > >> public class MyKafkaConumer extends Thread { >> > > > >> >> > > > >> private static final AtomicBoolean running= new >> > > AtomicBoolean(true); >> > > > >> private static final KafkaConsumer consumer; >> > > > >> public static final MyKafkaConumer INSTANCE = new >> > MyKafkaConumer(); >> > > > >> >> > > > >> static { >> > > > >> >> > > > >> Properties props = new Properties(); >> > > > >> props.put("bootstrap.servers", >> > > "kafkaServer101:9092,kafkaServ >> > > > >> er102:9092,kafkaServer103:9092"); >> > > > >> props.put("group.id", "mykafa-group"); >> > > > >> props.put("enable.auto.commit", "true"); >> > > > >> props.put("auto.commit.interval.ms", "1000"); >> > > > >> props.put(ConsumerConfig.KEY_ >> DESERIALIZER_CLASS_CONFIG, >> > > > >> StringDeserializer.class.getName()); >> > > > >> props.put(ConsumerConfig.VALU >> E_DESERIALIZER_CLASS_CONFIG, >> > > > >> IOTDeserializer.class.getName()); >> > > > >> consumer = new KafkaConsumer(props); >> > > > >> consumer.subscribe(Arrays.asList("mytopic")); >> > > > >> >> > > > >> } >> > > > >> >> > > > >> private MyKafkaConumer() { >> > > > >> super("MyKafkaConumer"); >> > > > >> } >> > > > >> >> > > > >> public void run() { >> > > > >> >> > > > >> >> > > > >> try { >> > > > >> while (running.get()) { >> > > > >> ConsumerRecords records = consumer.poll(2000L); >> > > > >> records.forEach(record -> { >> > > > >> System.out.printf("Consumer Record:(%d, %s, %d, >> > > %d)\n", >> > > > >> record.key(), record.value(), >> > > > >> record.partition(), record.offset()); >> > > > >> }); >> > > > >> >> > > > >> } >> > > > >> } finally { >> > > > >> consumer.close(); >> > > > >> } >> > > > >> >> > > > >> } >> > > > >> >> > > > >> public static void main(String[] args) throws >> > InterruptedException >> > > { >> > > > >> MyKafkaConumer.INSTANCE.start(); >> > > > >> MyKafkaConumer.INSTANCE.join(); >> > > > >> } >> > > > >> } >> > > > >> >> > > > >> I only have a single Consumer with a single thread running . What >> > > could >> > > > >> be the reason for the Kafka Consumer to stop processing from >> certain >> > > > >> partitions while the processing is happening for other partitions >> > even >> > > > >> though the producer is sending message to the partitions where it >> > was >> > > > >> stuck ? Any help here is very much appreciated. >> > > > >> >> > > > >> Thanks >> > > > >> Dev Loper >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > >> > >