Hi Steve, Yes, I am sure my consumer was the only owner of the partitions. The partitions are getting recovered at random intervals and there is only single consumer running in the entire environment . But since the data is time critical , its leading to data loss. I wanted to go deep and understand why this issue persists and what could be the reason. Any pointer on how to debug this issue would be highly helpful .
On Mon, Jul 9, 2018 at 11:43 AM, Steve Tian <steve.cs.t...@gmail.com> wrote: > Are you sure your consumer was still the owner of your partitions? > > On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote: > > > Hi Kafka Users, > > > > I am desperate to find an answer to this issue. I would like to know > > whether my issue is due to a Single Kafka Consumer ? Where should I look > > for answers for this issue? Is it something do with the Kafka Broker ? I > > am using Kafka version 0.11.01 version for both Kafka broker an client . > > > > Thanks > > > > Dev Loper > > > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote: > > > > > Hi Kafka Streams Users, > > > > > > I have posted the same question on stackoverflow and if anybody could > > > point some directions it would be of great help. > > > > > > https://stackoverflow.com/questions/51214506/kafka- > > > consumer-hung-on-certain-partitions-single-kafka-consumer > > > > > > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote: > > > > > >> Hi Kafka Streams Users, > > >> > > >> My test environment, I have three Kafka brokers and my topic is > having > > >> 16 partitions, there are 16 IOT Devices which are posting the messages > > to > > >> Kafka. I have a single system with one Kafka Consumer which subscribes > > to > > >> this topic. Each IOT devices are posting the message to Kafka every > > second > > >> and its distributed uniformly. I am > > >> printing the offset and partition to which the data posted using Kafka > > >> Producer Call back method on these each IOT device . My consumer stops > > >> consuming the messages from certain partitions randomly and at the > same > > >> time its processing records from other partitions. I actually verified > > the > > >> IOT device logs and I could see that the data is actually getting > > posted to > > >> the particular partitions where the consumer has stopped consuming and > > I am > > >> able to see the offsets getting incremented for those partitions. > There > > are > > >> no exceptions or any error of any kind in the consumer except that I > > don't > > >> see any processing logs for the partitions which stopped processing. > > >> > > >> > > >> Below I have given my pseudo code for my consumer which almost > > resembles > > >> the code which I am using in my application . > > >> > > >> public class MyKafkaConumer extends Thread { > > >> > > >> private static final AtomicBoolean running= new > AtomicBoolean(true); > > >> private static final KafkaConsumer consumer; > > >> public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); > > >> > > >> static { > > >> > > >> Properties props = new Properties(); > > >> props.put("bootstrap.servers", > "kafkaServer101:9092,kafkaServ > > >> er102:9092,kafkaServer103:9092"); > > >> props.put("group.id", "mykafa-group"); > > >> props.put("enable.auto.commit", "true"); > > >> props.put("auto.commit.interval.ms", "1000"); > > >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > >> StringDeserializer.class.getName()); > > >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > >> IOTDeserializer.class.getName()); > > >> consumer = new KafkaConsumer(props); > > >> consumer.subscribe(Arrays.asList("mytopic")); > > >> > > >> } > > >> > > >> private MyKafkaConumer() { > > >> super("MyKafkaConumer"); > > >> } > > >> > > >> public void run() { > > >> > > >> > > >> try { > > >> while (running.get()) { > > >> ConsumerRecords records = consumer.poll(2000L); > > >> records.forEach(record -> { > > >> System.out.printf("Consumer Record:(%d, %s, %d, > %d)\n", > > >> record.key(), record.value(), > > >> record.partition(), record.offset()); > > >> }); > > >> > > >> } > > >> } finally { > > >> consumer.close(); > > >> } > > >> > > >> } > > >> > > >> public static void main(String[] args) throws InterruptedException > { > > >> MyKafkaConumer.INSTANCE.start(); > > >> MyKafkaConumer.INSTANCE.join(); > > >> } > > >> } > > >> > > >> I only have a single Consumer with a single thread running . What > could > > >> be the reason for the Kafka Consumer to stop processing from certain > > >> partitions while the processing is happening for other partitions even > > >> though the producer is sending message to the partitions where it was > > >> stuck ? Any help here is very much appreciated. > > >> > > >> Thanks > > >> Dev Loper > > >> > > >> > > >> > > >> > > >> > > > > > >