Guozhang Wang created KAFKA-6085:
------------------------------------

             Summary: Streams rebalancing may cause a first batch of fetched 
records to be dropped
                 Key: KAFKA-6085
                 URL: https://issues.apache.org/jira/browse/KAFKA-6085
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.11.0.1
            Reporter: Guozhang Wang
            Assignee: Guozhang Wang
            Priority: Blocker
             Fix For: 1.0.0


This is a regression introduced in KAFKA-5152:

Assuming you have one task without any state stores (and hence no restoration 
needed for that task), and a rebalance happened in a {{records = 
pollRequests(pollTimeMs);}} call:

1. We name this `pollRequests` call A. And within call A the rebalance will 
happen, which put the thread state from RUNNING to PARTITION_REVOKED, and then 
from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets assigned 
again, this task will be in the initialized set of tasks but NOT in the running 
tasks yet.

2. Within the same call A, a fetch request may be sent and a response with a 
batch of records could be returned, and it will be returned from 
`pollRequests`. At this time the thread state become PARTITION_ASSIGNED and the 
task is not "running" yet.

3. Now the bug comes in this line:

{{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}

Since the task is not ing the active running set yet, this returned set of 
records would be skipped. Effectively these records are dropped on the floor 
and would never be consumed again.

4. In the next run loop, the same `pollRequest()` will be called again. Let's 
call it B. After B is called we will set the thread state to RUNNING and put 
the task to the running task set. But at this point the previous batch of 
records will not be returned any more.

So the bug lies in the fact that within a single run loop of the stream thread. 
We may complete a rebalance with tasks assigned but not yet initialized, AND we 
can fetch a bunch of records for that not-initialized task and drop on the 
floor.

With further investigation I can confirm that the new flaky test 
https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this 
bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this bug 
by failing the reset integration test more frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to