The processing does not take much time since the application reads from the
kafka and immediately puts into a in memory queue and another thread
persist these records  into DB.

On Mon, Jul 9, 2018 at 12:44 PM, dev loper <spark...@gmail.com> wrote:

> Hi Steve,
>
> When the issue occurs I could see the lag shown for the corresponding
> partitions in the output of kafka-consumer-groups command . I will try
> changing the log level to debug for kafka consumer and I will post if
> anything seems to be outside the normal range
>
> Thank you
> Dev Loper
>
> On Mon, Jul 9, 2018 at 12:24 PM, Steve Tian <steve.cs.t...@gmail.com>
> wrote:
>
>> Also, have you tried to enabled DEBUG level logging for KafkaConsumer? How
>> long does it take to process records in a single batch?
>>
>> On Mon, Jul 9, 2018, 2:51 PM Sabarish Sasidharan <sabarish....@gmail.com>
>> wrote:
>>
>> > Can you try executing kafka-consumer-groups to see what it says? Are
>> your
>> > log offsets increasing and the lag increasing when processing for some
>> of
>> > your partitions are stuck?
>> >
>> > May be the problem is in the producer side.
>> >
>> > Regards
>> > Sab
>> >
>> >
>> > On Mon, 9 Jul 2018, 11:43 am Steve Tian, <steve.cs.t...@gmail.com>
>> wrote:
>> >
>> > > Are you sure your consumer was still the owner of your partitions?
>> > >
>> > > On Mon, Jul 9, 2018, 12:54 PM dev loper <spark...@gmail.com> wrote:
>> > >
>> > > > Hi Kafka Users,
>> > > >
>> > > > I am desperate to find an answer to this issue. I would like to know
>> > > > whether my issue is due to a Single Kafka Consumer ?  Where should I
>> > look
>> > > > for answers for this issue?  Is it something do with the Kafka
>> Broker
>> > ? I
>> > > > am using Kafka version 0.11.01 version for both Kafka broker an
>> client
>> > .
>> > > >
>> > > > Thanks
>> > > >
>> > > > Dev Loper
>> > > >
>> > > > On Fri, Jul 6, 2018 at 11:01 PM, dev loper <spark...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi Kafka Streams Users,
>> > > > >
>> > > > > I have posted the same question on stackoverflow and if anybody
>> could
>> > > > > point some directions  it would be of great help.
>> > > > >
>> > > > > https://stackoverflow.com/questions/51214506/kafka-
>> > > > > consumer-hung-on-certain-partitions-single-kafka-consumer
>> > > > >
>> > > > >
>> > > > > On Fri, Jul 6, 2018 at 10:25 PM, dev loper <spark...@gmail.com>
>> > wrote:
>> > > > >
>> > > > >> Hi Kafka Streams Users,
>> > > > >>
>> > > > >>  My test environment, I have three Kafka brokers and my topic is
>> > > having
>> > > > >> 16 partitions, there are 16 IOT Devices which are posting the
>> > messages
>> > > > to
>> > > > >> Kafka. I have a single system with one Kafka Consumer which
>> > subscribes
>> > > > to
>> > > > >> this topic. Each IOT devices are posting  the message to Kafka
>> every
>> > > > second
>> > > > >> and its distributed uniformly. I am
>> > > > >> printing the offset and partition to which the data posted using
>> > Kafka
>> > > > >> Producer Call back method on these each IOT device . My consumer
>> > stops
>> > > > >> consuming the messages from certain partitions randomly and at
>> the
>> > > same
>> > > > >> time its processing records from other partitions. I actually
>> > verified
>> > > > the
>> > > > >> IOT device logs and I could see that the data is actually getting
>> > > > posted to
>> > > > >> the particular partitions where the consumer has stopped
>> consuming
>> > and
>> > > > I am
>> > > > >> able to see the offsets getting incremented for those partitions.
>> > > There
>> > > > are
>> > > > >> no exceptions or any error of any kind in the consumer except
>> that I
>> > > > don't
>> > > > >> see any processing logs for the partitions which stopped
>> processing.
>> > > > >>
>> > > > >>
>> > > > >> Below I have given my pseudo code for my consumer  which almost
>> > > > resembles
>> > > > >> the code which I am using in my application .
>> > > > >>
>> > > > >> public class MyKafkaConumer extends Thread {
>> > > > >>
>> > > > >>    private static final AtomicBoolean running= new
>> > > AtomicBoolean(true);
>> > > > >>    private static final KafkaConsumer consumer;
>> > > > >>    public static final MyKafkaConumer INSTANCE = new
>> > MyKafkaConumer();
>> > > > >>
>> > > > >>    static {
>> > > > >>
>> > > > >>           Properties props = new Properties();
>> > > > >>           props.put("bootstrap.servers",
>> > > "kafkaServer101:9092,kafkaServ
>> > > > >> er102:9092,kafkaServer103:9092");
>> > > > >>           props.put("group.id", "mykafa-group");
>> > > > >>           props.put("enable.auto.commit", "true");
>> > > > >>           props.put("auto.commit.interval.ms", "1000");
>> > > > >>           props.put(ConsumerConfig.KEY_
>> DESERIALIZER_CLASS_CONFIG,
>> > > > >>                   StringDeserializer.class.getName());
>> > > > >>           props.put(ConsumerConfig.VALU
>> E_DESERIALIZER_CLASS_CONFIG,
>> > > > >>                   IOTDeserializer.class.getName());
>> > > > >>           consumer = new KafkaConsumer(props);
>> > > > >>           consumer.subscribe(Arrays.asList("mytopic"));
>> > > > >>
>> > > > >>        }
>> > > > >>
>> > > > >>      private MyKafkaConumer() {
>> > > > >>           super("MyKafkaConumer");
>> > > > >>     }
>> > > > >>
>> > > > >>    public void run() {
>> > > > >>
>> > > > >>
>> > > > >>        try {
>> > > > >>         while (running.get()) {
>> > > > >>             ConsumerRecords records = consumer.poll(2000L);
>> > > > >>             records.forEach(record -> {
>> > > > >>                 System.out.printf("Consumer Record:(%d, %s, %d,
>> > > %d)\n",
>> > > > >> record.key(), record.value(),
>> > > > >>                         record.partition(), record.offset());
>> > > > >>             });
>> > > > >>
>> > > > >>         }
>> > > > >>     } finally {
>> > > > >>           consumer.close();
>> > > > >>     }
>> > > > >>
>> > > > >>    }
>> > > > >>
>> > > > >>    public static void main(String[] args) throws
>> > InterruptedException
>> > > {
>> > > > >>        MyKafkaConumer.INSTANCE.start();
>> > > > >>        MyKafkaConumer.INSTANCE.join();
>> > > > >> }
>> > > > >> }
>> > > > >>
>> > > > >> I only have a single Consumer with a single thread running . What
>> > > could
>> > > > >> be the reason for the Kafka Consumer to stop processing from
>> certain
>> > > > >> partitions while the processing is happening for other partitions
>> > even
>> > > > >> though the producer is sending message to the partitions where it
>> > was
>> > > > >> stuck ? Any help here is very much appreciated.
>> > > > >>
>> > > > >> Thanks
>> > > > >> Dev Loper
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to