When polling from kafka I am logging the number of records fetched.
I have 200 partitions for the topic and 12 Consumers each with 4 threads,
so around 4-5 partitions per Consumer, and if the Consumer is slow the
number of messages it's consuming increases to around 7000 messages.

Here is the code which I am using, it's the consumer implementation
suggested in confluent documentation -

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }}
public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }}


On Wed, May 4, 2016 at 10:38 PM Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

> Going by the name of that property (max.partition.fetch.bytes), I'm
> guessing it's the max fetch bytes per partition of a topic. Are you sure
> the data you are receiving in that consumers doesn't belong to multiple
> partitions and hence can/might exceed the value that's set per
> partition? By the way, what does the consumer code look like, where you
> are verifying/measuring this consumed size?
>
> -Jaikiran
> On Thursday 05 May 2016 03:00 AM, Abhinav Solan wrote:
> > Thanks a lot Jens for the reply.
> > One thing is still unclear is this happening only when we set the
> > max.partitions.fetch.bytes to a higher value ? Because I am setting it
> > quite lower at 8192 only instead, because I can control the size of the
> > data coming in Kafka, so even after setting this value why the Consumer
> is
> > fetching more records, is the Consumer not honoring this property, or is
> > there some other logic which is making it to fetch more data.
> >
> > Thanks,
> > Abhinav
> >
> > On Wed, May 4, 2016 at 1:40 PM Jens Rantil <jens.ran...@tink.se> wrote:
> >
> >> Hi,
> >>
> >> This is a known issue. The 0.10 release will fix this. See
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
> >> for some background.
> >>
> >> Cheers,
> >> Jens
> >>
> >> Den ons 4 maj 2016 19:32Abhinav Solan <abhinav.so...@gmail.com> skrev:
> >>
> >>> Hi,
> >>>
> >>> I am using kafka-0.9.0.1 and have configured the Kafka consumer  to
> fetch
> >>> 8192 bytes by setting max.partition.fetch.bytes
> >>>
> >>> Here are the properties I am using
> >>>
> >>> props.put("bootstrap.servers", servers);
> >>> props.put("group.id", "perf-test");
> >>> props.put("offset.storage", "kafka");
> >>> props.put("enable.auto.commit", "false");
> >>> props.put("session.timeout.ms", 60000);
> >>> props.put("request.timeout.ms", 70000);
> >>> props.put("heartbeat.interval.ms", 50000);
> >>> props.put("auto.offset.reset", "latest");
> >>> props.put("max.partition.fetch.bytes", "8192");
> >>> props.put("key.deserializer",
> >>> "org.apache.kafka.common.serialization.StringDeserializer");
> >>> props.put("value.deserializer",
> >>> "org.apache.kafka.common.serialization.StringDeserializer");
> >>>
> >>> I am setting up 12 Consumers with 4 workers each to listen on a topic
> >> with
> >>> 200 partitions.
> >>> I have also enabled the compression when sending to Kafka.
> >>>
> >>> The problem I am getting is, even though the fetch size is less, the
> >>> consumers when polling, poll too many records. If the topics have many
> >>> messages and it is behind in the consumption it tries to fetch bigger
> >> size,
> >>> if the consumer is not behind then it try and fetch around 45, but
> >> anyways
> >>> if I set the max.partition.fetch.bytes shouldn't the fetch size have an
> >>> upper limit ? Is there any other setting I am missing here ?
> >>> I am myself controlling the message size so it's not that some bigger
> >>> messages are coming through, each message must be around 200-300 bytes
> >>> only.
> >>>
> >>> Due the large number of messages it is polling, the inner process
> >> sometimes
> >>> not able to finish the process within the heartbeat interval limit,
> which
> >>> makes the consumer rebalancing kick in, again and again, this only
> >> happens
> >>> when the consumer is way behind in offset e.g there are 100000 messages
> >> to
> >>> be processed in the topic.
> >>>
> >>> Thanks
> >>>
> >> --
> >>
> >> Jens Rantil
> >> Backend Developer @ Tink
> >>
> >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
> >> For urgent matters you can reach me at +46-708-84 18 32.
> >>
>
>

Reply via email to