A. Sophie Blee-Goldman created KAFKA-10651:
----------------------------------------------

             Summary: Assignor reports offsets from uninitialized task
                 Key: KAFKA-10651
                 URL: https://issues.apache.org/jira/browse/KAFKA-10651
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.6.0
            Reporter: A. Sophie Blee-Goldman
             Fix For: 2.7.0, 2.6.1


In KIP-441, the new HA assignor makes an informed decision about stateful task 
placement based on the offset sums reported by each instance. Offset sums are 
computed one of two ways: for assigned tasks (ie those in the TaskManager's 
"tasks" map), it will just sum up the tasks' changelog offsets directly. For 
tasks that are not assigned but whose directory remains on disk, it reads the 
changelog offsets from the checkpoint file. This is encoded with the 
subscription userdata sent during the JoinGroup phase of a rebalance.

The problem here is that it's possible for the instance to rejoin the group 
after having been assigned a new task, but before that task is initialized. In 
this case it would not compute the offset sum from the checkpoint file but 
instead from the uninitialized task, causing it to skip reporting any offsets 
for that task whatsoever.

This results in a particularly nefarious interaction between HA and cooperative 
rebalancing. An instance may read from the checkpoint file of a caught-up (but 
unassigned) task and report this in its subscription, leading the assignor to 
compute a small lag and place this task on the instance. After placing all 
stateful tasks in this way, it will distribute the stateless tasks across the 
group to balance the overall workload. It does this without considering the 
previous owner of the stateless tasks, so odds are good that moving the 
stateful task to this instance will result in a different assortment of 
stateless tasks in this rebalance.

Any time owned tasks are moved around, the current owner will have to revoke 
them and trigger a followup cooperative rebalance. Within the Consumer client, 
this actually happens immediately: that is, within an invocation of poll() it 
will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the 
end of the last rebalance, if any partitions are revoked then a rejoin will 
indeed be needed. So the Consumer will send out it's next JoinGroup – including 
the userdata with computed task offset sums – without first exiting from the 
current poll(). Streams never gets the chance to initialize its new tasks, and 
ends up excluding them from the offset sums it reports in the following 
rebalance.

And since it doesn't report any offsets for this task, the assignor now 
believes the instance does _not_ have any caught up state for this task, and 
assigns the task elsewhere. This causes a shuffling of stateless tasks once 
more, which in turn results in another cooperative rebalance. This time the 
task is no longer assigned so the instance reports offsets based on the 
checkpoint file again, and we're back at the beginning.

Given the deterministic assignment, once a group is caught up in this cycle it 
will be impossible to escape it without manual intervention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to