Can you try executing kafka-consumer-groups to see what it says? Are your
log offsets increasing and the lag increasing when processing for some of
your partitions are stuck?

May be the problem is in the producer side.

Regards
Sab


On Mon, 9 Jul 2018, 11:43 am Steve Tian, <steve.cs.t...@gmail.com> wrote:

> Are you sure your consumer was still the owner of your partitions?
>
> On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote:
>
> > Hi Kafka Users,
> >
> > I am desperate to find an answer to this issue. I would like to know
> > whether my issue is due to a Single Kafka Consumer ?  Where should I look
> > for answers for this issue?  Is it something do with the Kafka Broker ? I
> > am using Kafka version 0.11.01 version for both Kafka broker an client .
> >
> > Thanks
> >
> > Dev Loper
> >
> > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote:
> >
> > > Hi Kafka Streams Users,
> > >
> > > I have posted the same question on stackoverflow and if anybody could
> > > point some directions  it would be of great help.
> > >
> > > https://stackoverflow.com/questions/51214506/kafka-
> > > consumer-hung-on-certain-partitions-single-kafka-consumer
> > >
> > >
> > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote:
> > >
> > >> Hi Kafka Streams Users,
> > >>
> > >>  My test environment, I have three Kafka brokers and my topic is
> having
> > >> 16 partitions, there are 16 IOT Devices which are posting the messages
> > to
> > >> Kafka. I have a single system with one Kafka Consumer which subscribes
> > to
> > >> this topic. Each IOT devices are posting  the message to Kafka every
> > second
> > >> and its distributed uniformly. I am
> > >> printing the offset and partition to which the data posted using Kafka
> > >> Producer Call back method on these each IOT device . My consumer stops
> > >> consuming the messages from certain partitions randomly and at the
> same
> > >> time its processing records from other partitions. I actually verified
> > the
> > >> IOT device logs and I could see that the data is actually getting
> > posted to
> > >> the particular partitions where the consumer has stopped consuming and
> > I am
> > >> able to see the offsets getting incremented for those partitions.
> There
> > are
> > >> no exceptions or any error of any kind in the consumer except that I
> > don't
> > >> see any processing logs for the partitions which stopped processing.
> > >>
> > >>
> > >> Below I have given my pseudo code for my consumer  which almost
> > resembles
> > >> the code which I am using in my application .
> > >>
> > >> public class MyKafkaConumer extends Thread {
> > >>
> > >>    private static final AtomicBoolean running= new
> AtomicBoolean(true);
> > >>    private static final KafkaConsumer consumer;
> > >>    public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();
> > >>
> > >>    static {
> > >>
> > >>           Properties props = new Properties();
> > >>           props.put("bootstrap.servers",
> "kafkaServer101:9092,kafkaServ
> > >> er102:9092,kafkaServer103:9092");
> > >>           props.put("group.id", "mykafa-group");
> > >>           props.put("enable.auto.commit", "true");
> > >>           props.put("auto.commit.interval.ms", "1000");
> > >>           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > >>                   StringDeserializer.class.getName());
> > >>           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > >>                   IOTDeserializer.class.getName());
> > >>           consumer = new KafkaConsumer(props);
> > >>           consumer.subscribe(Arrays.asList("mytopic"));
> > >>
> > >>        }
> > >>
> > >>      private MyKafkaConumer() {
> > >>           super("MyKafkaConumer");
> > >>     }
> > >>
> > >>    public void run() {
> > >>
> > >>
> > >>        try {
> > >>         while (running.get()) {
> > >>             ConsumerRecords records = consumer.poll(2000L);
> > >>             records.forEach(record -> {
> > >>                 System.out.printf("Consumer Record:(%d, %s, %d,
> %d)\n",
> > >> record.key(), record.value(),
> > >>                         record.partition(), record.offset());
> > >>             });
> > >>
> > >>         }
> > >>     } finally {
> > >>           consumer.close();
> > >>     }
> > >>
> > >>    }
> > >>
> > >>    public static void main(String[] args) throws InterruptedException
> {
> > >>        MyKafkaConumer.INSTANCE.start();
> > >>        MyKafkaConumer.INSTANCE.join();
> > >> }
> > >> }
> > >>
> > >> I only have a single Consumer with a single thread running . What
> could
> > >> be the reason for the Kafka Consumer to stop processing from certain
> > >> partitions while the processing is happening for other partitions even
> > >> though the producer is sending message to the partitions where it was
> > >> stuck ? Any help here is very much appreciated.
> > >>
> > >> Thanks
> > >> Dev Loper
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> >
>

Reply via email to