[ https://issues.apache.org/jira/browse/KAFKA-3627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264870#comment-15264870 ]
ASF GitHub Bot commented on KAFKA-3627: --------------------------------------- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/1295 KAFKA-3627: consumer fails to execute delayed tasks in poll when records are available You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-3627 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1295 ---- commit 04116c65369926955d79084851f6dfda313b5fee Author: Jason Gustafson <ja...@confluent.io> Date: 2016-04-29T21:58:35Z KAFKA-3627: consumer fails to execute delayed tasks in poll when records are available ---- > New consumer doesn't run delayed tasks while under load > ------------------------------------------------------- > > Key: KAFKA-3627 > URL: https://issues.apache.org/jira/browse/KAFKA-3627 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1 > Reporter: Rob Underwood > Assignee: Jason Gustafson > Attachments: DelayedTaskBugConsumer.java, kafka-3627-output.log > > > If the new consumer receives a steady flow of fetch responses it will not run > delayed tasks, which means it will not heartbeat or perform automatic offset > commits. > The main cause is the code that attempts to pipeline fetch responses and keep > the consumer fed. Specifically, in KafkaConsumer::pollOnce() there is a > check that skips calling client.poll() if there are fetched records ready > (line 903 in the 0.9.0 branch of this writing). Then in > KafkaConsumer::poll(), if records are returned it will initiate another fetch > and perform a quick poll, which will send/receive fetch requests/responses > but will not run delayed tasks. > If the timing works out, and the consumer is consistently receiving fetched > records, it won't run delayed tasks until it doesn't receive a fetch response > during its quick poll. That leads to a rebalance since the consumer isn't > heartbeating, and typically means all the consumed records will be > re-delivered since the automatic offset commit wasn't able to run either. > h5. Steps to reproduce > # Start up a cluster with *at least 2 brokers*. This seems to be required to > reproduce the issue, I'm guessing because the fetch responses all arrive > together when using a single broker. > # Create a topic with a good number of partitions > #* bq. bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic > delayed-task-bug --partitions 10 --replication-factor 1 > # Generate some test data so the consumer has plenty to consume. In this > case I'm just using uuids > #* bq. for ((i=0;i<100;++i)) do; cat /proc/sys/kernel/random/uuid >> > /tmp/test-messages; done > #* bq. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > delayed-task-bug < /tmp/test-messages > # Start up a consumer with a small max fetch size to ensure it only pulls a > few records at a time. The consumer can simply sleep for a moment when it > receives a record. > #* I'll attach an example in Java > # There's a timing aspect to this issue so it may take a few attempts to > reproduce -- This message was sent by Atlassian JIRA (v6.3.4#6332)