When polling from kafka I am logging the number of records fetched. I have 200 partitions for the topic and 12 Consumers each with 4 threads, so around 4-5 partitions per Consumer, and if the Consumer is slow the number of messages it's consuming increases to around 7000 messages.
Here is the code which I am using, it's the consumer implementation suggested in confluent documentation - private void doCommitSync() { try { consumer.commitSync(); } catch (WakeupException e) { // we're shutting down, but finish the commit first and then // rethrow the exception so that the main loop can exit doCommitSync(); throw e; } catch (CommitFailedException e) { // the commit failed with an unrecoverable error. if there is any // internal state which depended on the commit, you can clean it // up here. otherwise it's reasonable to ignore the error and go on log.debug("Commit failed", e); }} public void run() { try { consumer.subscribe(topics, new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { doCommitSync(); } public void onPartitionsAssigned(Collection<TopicPartition> partitions) {} }); while (true) { ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); consumer.commitAsync(); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { try { doCommitSync(); } finally { consumer.close(); shutdownLatch.countDown(); } }} On Wed, May 4, 2016 at 10:38 PM Jaikiran Pai <jai.forums2...@gmail.com> wrote: > Going by the name of that property (max.partition.fetch.bytes), I'm > guessing it's the max fetch bytes per partition of a topic. Are you sure > the data you are receiving in that consumers doesn't belong to multiple > partitions and hence can/might exceed the value that's set per > partition? By the way, what does the consumer code look like, where you > are verifying/measuring this consumed size? > > -Jaikiran > On Thursday 05 May 2016 03:00 AM, Abhinav Solan wrote: > > Thanks a lot Jens for the reply. > > One thing is still unclear is this happening only when we set the > > max.partitions.fetch.bytes to a higher value ? Because I am setting it > > quite lower at 8192 only instead, because I can control the size of the > > data coming in Kafka, so even after setting this value why the Consumer > is > > fetching more records, is the Consumer not honoring this property, or is > > there some other logic which is making it to fetch more data. > > > > Thanks, > > Abhinav > > > > On Wed, May 4, 2016 at 1:40 PM Jens Rantil <jens.ran...@tink.se> wrote: > > > >> Hi, > >> > >> This is a known issue. The 0.10 release will fix this. See > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records > >> for some background. > >> > >> Cheers, > >> Jens > >> > >> Den ons 4 maj 2016 19:32Abhinav Solan <abhinav.so...@gmail.com> skrev: > >> > >>> Hi, > >>> > >>> I am using kafka-0.9.0.1 and have configured the Kafka consumer to > fetch > >>> 8192 bytes by setting max.partition.fetch.bytes > >>> > >>> Here are the properties I am using > >>> > >>> props.put("bootstrap.servers", servers); > >>> props.put("group.id", "perf-test"); > >>> props.put("offset.storage", "kafka"); > >>> props.put("enable.auto.commit", "false"); > >>> props.put("session.timeout.ms", 60000); > >>> props.put("request.timeout.ms", 70000); > >>> props.put("heartbeat.interval.ms", 50000); > >>> props.put("auto.offset.reset", "latest"); > >>> props.put("max.partition.fetch.bytes", "8192"); > >>> props.put("key.deserializer", > >>> "org.apache.kafka.common.serialization.StringDeserializer"); > >>> props.put("value.deserializer", > >>> "org.apache.kafka.common.serialization.StringDeserializer"); > >>> > >>> I am setting up 12 Consumers with 4 workers each to listen on a topic > >> with > >>> 200 partitions. > >>> I have also enabled the compression when sending to Kafka. > >>> > >>> The problem I am getting is, even though the fetch size is less, the > >>> consumers when polling, poll too many records. If the topics have many > >>> messages and it is behind in the consumption it tries to fetch bigger > >> size, > >>> if the consumer is not behind then it try and fetch around 45, but > >> anyways > >>> if I set the max.partition.fetch.bytes shouldn't the fetch size have an > >>> upper limit ? Is there any other setting I am missing here ? > >>> I am myself controlling the message size so it's not that some bigger > >>> messages are coming through, each message must be around 200-300 bytes > >>> only. > >>> > >>> Due the large number of messages it is polling, the inner process > >> sometimes > >>> not able to finish the process within the heartbeat interval limit, > which > >>> makes the consumer rebalancing kick in, again and again, this only > >> happens > >>> when the consumer is way behind in offset e.g there are 100000 messages > >> to > >>> be processed in the topic. > >>> > >>> Thanks > >>> > >> -- > >> > >> Jens Rantil > >> Backend Developer @ Tink > >> > >> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden > >> For urgent matters you can reach me at +46-708-84 18 32. > >> > >