Hi Steve,

Yes, I am sure my consumer was the only owner of the partitions.  The
partitions are getting recovered at random intervals and there is only
single consumer running in the entire environment . But since the data is
time critical ,  its leading to data loss.  I wanted to go deep and
understand why this issue persists and what could be the reason. Any
pointer on how to debug this issue would be highly helpful .




On Mon, Jul 9, 2018 at 11:43 AM, Steve Tian <steve.cs.t...@gmail.com> wrote:

> Are you sure your consumer was still the owner of your partitions?
>
> On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote:
>
> > Hi Kafka Users,
> >
> > I am desperate to find an answer to this issue. I would like to know
> > whether my issue is due to a Single Kafka Consumer ?  Where should I look
> > for answers for this issue?  Is it something do with the Kafka Broker ? I
> > am using Kafka version 0.11.01 version for both Kafka broker an client .
> >
> > Thanks
> >
> > Dev Loper
> >
> > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com> wrote:
> >
> > > Hi Kafka Streams Users,
> > >
> > > I have posted the same question on stackoverflow and if anybody could
> > > point some directions  it would be of great help.
> > >
> > > https://stackoverflow.com/questions/51214506/kafka-
> > > consumer-hung-on-certain-partitions-single-kafka-consumer
> > >
> > >
> > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com> wrote:
> > >
> > >> Hi Kafka Streams Users,
> > >>
> > >>  My test environment, I have three Kafka brokers and my topic is
> having
> > >> 16 partitions, there are 16 IOT Devices which are posting the messages
> > to
> > >> Kafka. I have a single system with one Kafka Consumer which subscribes
> > to
> > >> this topic. Each IOT devices are posting  the message to Kafka every
> > second
> > >> and its distributed uniformly. I am
> > >> printing the offset and partition to which the data posted using Kafka
> > >> Producer Call back method on these each IOT device . My consumer stops
> > >> consuming the messages from certain partitions randomly and at the
> same
> > >> time its processing records from other partitions. I actually verified
> > the
> > >> IOT device logs and I could see that the data is actually getting
> > posted to
> > >> the particular partitions where the consumer has stopped consuming and
> > I am
> > >> able to see the offsets getting incremented for those partitions.
> There
> > are
> > >> no exceptions or any error of any kind in the consumer except that I
> > don't
> > >> see any processing logs for the partitions which stopped processing.
> > >>
> > >>
> > >> Below I have given my pseudo code for my consumer  which almost
> > resembles
> > >> the code which I am using in my application .
> > >>
> > >> public class MyKafkaConumer extends Thread {
> > >>
> > >>    private static final AtomicBoolean running= new
> AtomicBoolean(true);
> > >>    private static final KafkaConsumer consumer;
> > >>    public static final MyKafkaConumer INSTANCE = new MyKafkaConumer();
> > >>
> > >>    static {
> > >>
> > >>           Properties props = new Properties();
> > >>           props.put("bootstrap.servers",
> "kafkaServer101:9092,kafkaServ
> > >> er102:9092,kafkaServer103:9092");
> > >>           props.put("group.id", "mykafa-group");
> > >>           props.put("enable.auto.commit", "true");
> > >>           props.put("auto.commit.interval.ms", "1000");
> > >>           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > >>                   StringDeserializer.class.getName());
> > >>           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > >>                   IOTDeserializer.class.getName());
> > >>           consumer = new KafkaConsumer(props);
> > >>           consumer.subscribe(Arrays.asList("mytopic"));
> > >>
> > >>        }
> > >>
> > >>      private MyKafkaConumer() {
> > >>           super("MyKafkaConumer");
> > >>     }
> > >>
> > >>    public void run() {
> > >>
> > >>
> > >>        try {
> > >>         while (running.get()) {
> > >>             ConsumerRecords records = consumer.poll(2000L);
> > >>             records.forEach(record -> {
> > >>                 System.out.printf("Consumer Record:(%d, %s, %d,
> %d)\n",
> > >> record.key(), record.value(),
> > >>                         record.partition(), record.offset());
> > >>             });
> > >>
> > >>         }
> > >>     } finally {
> > >>           consumer.close();
> > >>     }
> > >>
> > >>    }
> > >>
> > >>    public static void main(String[] args) throws InterruptedException
> {
> > >>        MyKafkaConumer.INSTANCE.start();
> > >>        MyKafkaConumer.INSTANCE.join();
> > >> }
> > >> }
> > >>
> > >> I only have a single Consumer with a single thread running . What
> could
> > >> be the reason for the Kafka Consumer to stop processing from certain
> > >> partitions while the processing is happening for other partitions even
> > >> though the producer is sending message to the partitions where it was
> > >> stuck ? Any help here is very much appreciated.
> > >>
> > >> Thanks
> > >> Dev Loper
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> >
>

Reply via email to