Are you sure your consumer was still the owner of your partitions? On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote:
> Hi Kafka Users, > > I am desperate to find an answer to this issue. I would like to know > whether my issue is due to a Single Kafka Consumer ? Where should I look > for answers for this issue? Is it something do with the Kafka Broker ? I > am using Kafka version 0.11.01 version for both Kafka broker an client . > > Thanks > > Dev Loper > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote: > > > Hi Kafka Streams Users, > > > > I have posted the same question on stackoverflow and if anybody could > > point some directions it would be of great help. > > > > https://stackoverflow.com/questions/51214506/kafka- > > consumer-hung-on-certain-partitions-single-kafka-consumer > > > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote: > > > >> Hi Kafka Streams Users, > >> > >> My test environment, I have three Kafka brokers and my topic is having > >> 16 partitions, there are 16 IOT Devices which are posting the messages > to > >> Kafka. I have a single system with one Kafka Consumer which subscribes > to > >> this topic. Each IOT devices are posting the message to Kafka every > second > >> and its distributed uniformly. I am > >> printing the offset and partition to which the data posted using Kafka > >> Producer Call back method on these each IOT device . My consumer stops > >> consuming the messages from certain partitions randomly and at the same > >> time its processing records from other partitions. I actually verified > the > >> IOT device logs and I could see that the data is actually getting > posted to > >> the particular partitions where the consumer has stopped consuming and > I am > >> able to see the offsets getting incremented for those partitions. There > are > >> no exceptions or any error of any kind in the consumer except that I > don't > >> see any processing logs for the partitions which stopped processing. > >> > >> > >> Below I have given my pseudo code for my consumer which almost > resembles > >> the code which I am using in my application . > >> > >> public class MyKafkaConumer extends Thread { > >> > >> private static final AtomicBoolean running= new AtomicBoolean(true); > >> private static final KafkaConsumer consumer; > >> public static final MyKafkaConumer INSTANCE = new MyKafkaConumer(); > >> > >> static { > >> > >> Properties props = new Properties(); > >> props.put("bootstrap.servers", "kafkaServer101:9092,kafkaServ > >> er102:9092,kafkaServer103:9092"); > >> props.put("group.id", "mykafa-group"); > >> props.put("enable.auto.commit", "true"); > >> props.put("auto.commit.interval.ms", "1000"); > >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > >> StringDeserializer.class.getName()); > >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > >> IOTDeserializer.class.getName()); > >> consumer = new KafkaConsumer(props); > >> consumer.subscribe(Arrays.asList("mytopic")); > >> > >> } > >> > >> private MyKafkaConumer() { > >> super("MyKafkaConumer"); > >> } > >> > >> public void run() { > >> > >> > >> try { > >> while (running.get()) { > >> ConsumerRecords records = consumer.poll(2000L); > >> records.forEach(record -> { > >> System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", > >> record.key(), record.value(), > >> record.partition(), record.offset()); > >> }); > >> > >> } > >> } finally { > >> consumer.close(); > >> } > >> > >> } > >> > >> public static void main(String[] args) throws InterruptedException { > >> MyKafkaConumer.INSTANCE.start(); > >> MyKafkaConumer.INSTANCE.join(); > >> } > >> } > >> > >> I only have a single Consumer with a single thread running . What could > >> be the reason for the Kafka Consumer to stop processing from certain > >> partitions while the processing is happening for other partitions even > >> though the producer is sending message to the partitions where it was > >> stuck ? Any help here is very much appreciated. > >> > >> Thanks > >> Dev Loper > >> > >> > >> > >> > >> > > >