Hi Kafka Streams Users,

I have posted the same question on stackoverflow and if anybody could point
some directions  it would be of great help.

https://stackoverflow.com/questions/51214506/kafka-consumer-hung-on-certain-partitions-single-kafka-consumer


On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote:

> Hi Kafka Streams Users,
>
>  My test environment, I have three Kafka brokers and my topic is having
> 16 partitions, there are 16 IOT Devices which are posting the messages to
> Kafka. I have a single system with one Kafka Consumer which subscribes to
> this topic. Each IOT devices are posting  the message to Kafka every second
> and its distributed uniformly. I am
> printing the offset and partition to which the data posted using Kafka
> Producer Call back method on these each IOT device . My consumer stops
> consuming the messages from certain partitions randomly and at the same
> time its processing records from other partitions. I actually verified the
> IOT device logs and I could see that the data is actually getting posted to
> the particular partitions where the consumer has stopped consuming and I am
> able to see the offsets getting incremented for those partitions. There are
> no exceptions or any error of any kind in the consumer except that I don't
> see any processing logs for the partitions which stopped processing.
>
>
> Below I have given my pseudo code for my consumer  which almost resembles
> the code which I am using in my application .
>
> public class MyKafkaConumer extends Thread {
>
>    private static final AtomicBoolean running= new AtomicBoolean(true);
>    private static final KafkaConsumer consumer;
>    public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();
>
>    static {
>
>           Properties props = new Properties();
>           props.put("bootstrap.servers", "kafkaServer101:9092,
> kafkaServer102:9092,kafkaServer103:9092");
>           props.put("group.id", "mykafa-group");
>           props.put("enable.auto.commit", "true");
>           props.put("auto.commit.interval.ms", "1000");
>           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>                   StringDeserializer.class.getName());
>           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>                   IOTDeserializer.class.getName());
>           consumer = new KafkaConsumer(props);
>           consumer.subscribe(Arrays.asList("mytopic"));
>
>        }
>
>      private MyKafkaConumer() {
>           super("MyKafkaConumer");
>     }
>
>    public void run() {
>
>
>        try {
>         while (running.get()) {
>             ConsumerRecords records = consumer.poll(2000L);
>             records.forEach(record -> {
>                 System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
> record.key(), record.value(),
>                         record.partition(), record.offset());
>             });
>
>         }
>     } finally {
>           consumer.close();
>     }
>
>    }
>
>    public static void main(String[] args) throws InterruptedException {
>        MyKafkaConumer.INSTANCE.start();
>        MyKafkaConumer.INSTANCE.join();
> }
> }
>
> I only have a single Consumer with a single thread running . What could be
> the reason for the Kafka Consumer to stop processing from certain
> partitions while the processing is happening for other partitions even
> though the producer is sending message to the partitions where it was
> stuck ? Any help here is very much appreciated.
>
> Thanks
> Dev Loper
>
>
>
>
>

Reply via email to