[ 
https://issues.apache.org/jira/browse/KAFKA-3627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264870#comment-15264870
 ] 

ASF GitHub Bot commented on KAFKA-3627:
---------------------------------------

GitHub user hachikuji opened a pull request:

    https://github.com/apache/kafka/pull/1295

    KAFKA-3627: consumer fails to execute delayed tasks in poll when records 
are available

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hachikuji/kafka KAFKA-3627

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1295
    
----
commit 04116c65369926955d79084851f6dfda313b5fee
Author: Jason Gustafson <ja...@confluent.io>
Date:   2016-04-29T21:58:35Z

    KAFKA-3627: consumer fails to execute delayed tasks in poll when records 
are available

----


> New consumer doesn't run delayed tasks while under load
> -------------------------------------------------------
>
>                 Key: KAFKA-3627
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3627
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.9.0.1
>            Reporter: Rob Underwood
>            Assignee: Jason Gustafson
>         Attachments: DelayedTaskBugConsumer.java, kafka-3627-output.log
>
>
> If the new consumer receives a steady flow of fetch responses it will not run 
> delayed tasks, which means it will not heartbeat or perform automatic offset 
> commits.
> The main cause is the code that attempts to pipeline fetch responses and keep 
> the consumer fed.  Specifically, in KafkaConsumer::pollOnce() there is a 
> check that skips calling client.poll() if there are fetched records ready 
> (line 903 in the 0.9.0 branch of this writing).  Then in 
> KafkaConsumer::poll(), if records are returned it will initiate another fetch 
> and perform a quick poll, which will send/receive fetch requests/responses 
> but will not run delayed tasks.
> If the timing works out, and the consumer is consistently receiving fetched 
> records, it won't run delayed tasks until it doesn't receive a fetch response 
> during its quick poll.  That leads to a rebalance since the consumer isn't 
> heartbeating, and typically means all the consumed records will be 
> re-delivered since the automatic offset commit wasn't able to run either.
> h5. Steps to reproduce
> # Start up a cluster with *at least 2 brokers*.  This seems to be required to 
> reproduce the issue, I'm guessing because the fetch responses all arrive 
> together when using a single broker.
> # Create a topic with a good number of partitions
> #* bq. bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 
> delayed-task-bug --partitions 10 --replication-factor 1
> # Generate some test data so the consumer has plenty to consume.  In this 
> case I'm just using uuids
> #* bq. for ((i=0;i<100;++i)) do; cat /proc/sys/kernel/random/uuid >>  
> /tmp/test-messages; done
> #* bq. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
> delayed-task-bug < /tmp/test-messages
> # Start up a consumer with a small max fetch size to ensure it only pulls a 
> few records at a time.  The consumer can simply sleep for a moment when it 
> receives a record.
> #* I'll attach an example in Java
> # There's a timing aspect to this issue so it may take a few attempts to 
> reproduce



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to