Hi Kafka Users,

I am desperate to find an answer to this issue. I would like to know
whether my issue is due to a Single Kafka Consumer ?  Where should I look
for answers for this issue?  Is it something do with the Kafka Broker ? I
am using Kafka version 0.11.01 version for both Kafka broker an client .

Thanks

Dev Loper

On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote:

> Hi Kafka Streams Users,
>
> I have posted the same question on stackoverflow and if anybody could
> point some directions  it would be of great help.
>
> https://stackoverflow.com/questions/51214506/kafka-
> consumer-hung-on-certain-partitions-single-kafka-consumer
>
>
> On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote:
>
>> Hi Kafka Streams Users,
>>
>>  My test environment, I have three Kafka brokers and my topic is having
>> 16 partitions, there are 16 IOT Devices which are posting the messages to
>> Kafka. I have a single system with one Kafka Consumer which subscribes to
>> this topic. Each IOT devices are posting  the message to Kafka every second
>> and its distributed uniformly. I am
>> printing the offset and partition to which the data posted using Kafka
>> Producer Call back method on these each IOT device . My consumer stops
>> consuming the messages from certain partitions randomly and at the same
>> time its processing records from other partitions. I actually verified the
>> IOT device logs and I could see that the data is actually getting posted to
>> the particular partitions where the consumer has stopped consuming and I am
>> able to see the offsets getting incremented for those partitions. There are
>> no exceptions or any error of any kind in the consumer except that I don't
>> see any processing logs for the partitions which stopped processing.
>>
>>
>> Below I have given my pseudo code for my consumer  which almost resembles
>> the code which I am using in my application .
>>
>> public class MyKafkaConumer extends Thread {
>>
>>    private static final AtomicBoolean running= new AtomicBoolean(true);
>>    private static final KafkaConsumer consumer;
>>    public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();
>>
>>    static {
>>
>>           Properties props = new Properties();
>>           props.put("bootstrap.servers", "kafkaServer101:9092,kafkaServ
>> er102:9092,kafkaServer103:9092");
>>           props.put("group.id", "mykafa-group");
>>           props.put("enable.auto.commit", "true");
>>           props.put("auto.commit.interval.ms", "1000");
>>           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>                   StringDeserializer.class.getName());
>>           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>                   IOTDeserializer.class.getName());
>>           consumer = new KafkaConsumer(props);
>>           consumer.subscribe(Arrays.asList("mytopic"));
>>
>>        }
>>
>>      private MyKafkaConumer() {
>>           super("MyKafkaConumer");
>>     }
>>
>>    public void run() {
>>
>>
>>        try {
>>         while (running.get()) {
>>             ConsumerRecords records = consumer.poll(2000L);
>>             records.forEach(record -> {
>>                 System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
>> record.key(), record.value(),
>>                         record.partition(), record.offset());
>>             });
>>
>>         }
>>     } finally {
>>           consumer.close();
>>     }
>>
>>    }
>>
>>    public static void main(String[] args) throws InterruptedException {
>>        MyKafkaConumer.INSTANCE.start();
>>        MyKafkaConumer.INSTANCE.join();
>> }
>> }
>>
>> I only have a single Consumer with a single thread running . What could
>> be the reason for the Kafka Consumer to stop processing from certain
>> partitions while the processing is happening for other partitions even
>> though the producer is sending message to the partitions where it was
>> stuck ? Any help here is very much appreciated.
>>
>> Thanks
>> Dev Loper
>>
>>
>>
>>
>>
>

Reply via email to