[ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8677:
---------------------------------
    Description: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]

 
*18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
 *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*     
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records

---------------------------

I found this flaky test is actually exposing a real bug in consumer: within 
{{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
request before returning the data in order to pipelining the fetch requests:

{code}
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off 
the next round of fetches
                    // and avoid block waiting for their responses to enable 
pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been 
updated, we must not allow
                    // wakeups or any other errors to be triggered prior to 
returning the fetched records.
                    if (fetcher.sendFetches() > 0 || 
client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }

                    return this.interceptors.onConsume(new 
ConsumerRecords<>(records));
                }
{code}

As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, since 
at this point the fetch position has been updated. If an exception is thrown 
here, and the callers decides to capture and continue, those records would 
never be returned again, causing data loss.

  was:
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]

 
*18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
 *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*     
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records


> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8677
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8677
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, security, unit tests
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Anastasia Vela
>            Priority: Major
>              Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*     
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
> ---------------------------
> I found this flaky test is actually exposing a real bug in consumer: within 
> {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
> request before returning the data in order to pipelining the fetch requests:
> {code}
>                 if (!records.isEmpty()) {
>                     // before returning the fetched records, we can send off 
> the next round of fetches
>                     // and avoid block waiting for their responses to enable 
> pipelining while the user
>                     // is handling the fetched records.
>                     //
>                     // NOTE: since the consumed position has already been 
> updated, we must not allow
>                     // wakeups or any other errors to be triggered prior to 
> returning the fetched records.
>                     if (fetcher.sendFetches() > 0 || 
> client.hasPendingRequests()) {
>                         client.pollNoWakeup();
>                     }
>                     return this.interceptors.onConsume(new 
> ConsumerRecords<>(records));
>                 }
> {code}
> As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, 
> since at this point the fetch position has been updated. If an exception is 
> thrown here, and the callers decides to capture and continue, those records 
> would never be returned again, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to