Are you sure your consumer was still the owner of your partitions?

On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote:

> Hi Kafka Users,
>
> I am desperate to find an answer to this issue. I would like to know
> whether my issue is due to a Single Kafka Consumer ?  Where should I look
> for answers for this issue?  Is it something do with the Kafka Broker ? I
> am using Kafka version 0.11.01 version for both Kafka broker an client .
>
> Thanks
>
> Dev Loper
>
> On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote:
>
> > Hi Kafka Streams Users,
> >
> > I have posted the same question on stackoverflow and if anybody could
> > point some directions  it would be of great help.
> >
> > https://stackoverflow.com/questions/51214506/kafka-
> > consumer-hung-on-certain-partitions-single-kafka-consumer
> >
> >
> > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote:
> >
> >> Hi Kafka Streams Users,
> >>
> >>  My test environment, I have three Kafka brokers and my topic is having
> >> 16 partitions, there are 16 IOT Devices which are posting the messages
> to
> >> Kafka. I have a single system with one Kafka Consumer which subscribes
> to
> >> this topic. Each IOT devices are posting  the message to Kafka every
> second
> >> and its distributed uniformly. I am
> >> printing the offset and partition to which the data posted using Kafka
> >> Producer Call back method on these each IOT device . My consumer stops
> >> consuming the messages from certain partitions randomly and at the same
> >> time its processing records from other partitions. I actually verified
> the
> >> IOT device logs and I could see that the data is actually getting
> posted to
> >> the particular partitions where the consumer has stopped consuming and
> I am
> >> able to see the offsets getting incremented for those partitions. There
> are
> >> no exceptions or any error of any kind in the consumer except that I
> don't
> >> see any processing logs for the partitions which stopped processing.
> >>
> >>
> >> Below I have given my pseudo code for my consumer  which almost
> resembles
> >> the code which I am using in my application .
> >>
> >> public class MyKafkaConumer extends Thread {
> >>
> >>    private static final AtomicBoolean running= new AtomicBoolean(true);
> >>    private static final KafkaConsumer consumer;
> >>    public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();
> >>
> >>    static {
> >>
> >>           Properties props = new Properties();
> >>           props.put("bootstrap.servers", "kafkaServer101:9092,kafkaServ
> >> er102:9092,kafkaServer103:9092");
> >>           props.put("group.id", "mykafa-group");
> >>           props.put("enable.auto.commit", "true");
> >>           props.put("auto.commit.interval.ms", "1000");
> >>           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> >>                   StringDeserializer.class.getName());
> >>           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> >>                   IOTDeserializer.class.getName());
> >>           consumer = new KafkaConsumer(props);
> >>           consumer.subscribe(Arrays.asList("mytopic"));
> >>
> >>        }
> >>
> >>      private MyKafkaConumer() {
> >>           super("MyKafkaConumer");
> >>     }
> >>
> >>    public void run() {
> >>
> >>
> >>        try {
> >>         while (running.get()) {
> >>             ConsumerRecords records = consumer.poll(2000L);
> >>             records.forEach(record -> {
> >>                 System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
> >> record.key(), record.value(),
> >>                         record.partition(), record.offset());
> >>             });
> >>
> >>         }
> >>     } finally {
> >>           consumer.close();
> >>     }
> >>
> >>    }
> >>
> >>    public static void main(String[] args) throws InterruptedException {
> >>        MyKafkaConumer.INSTANCE.start();
> >>        MyKafkaConumer.INSTANCE.join();
> >> }
> >> }
> >>
> >> I only have a single Consumer with a single thread running . What could
> >> be the reason for the Kafka Consumer to stop processing from certain
> >> partitions while the processing is happening for other partitions even
> >> though the producer is sending message to the partitions where it was
> >> stuck ? Any help here is very much appreciated.
> >>
> >> Thanks
> >> Dev Loper
> >>
> >>
> >>
> >>
> >>
> >
>

Reply via email to