[
https://issues.apache.org/jira/browse/KAFKA-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
A. Sophie Blee-Goldman reassigned KAFKA-10651:
----------------------------------------------
Assignee: A. Sophie Blee-Goldman
> Assignor reports offsets from uninitialized task
> ------------------------------------------------
>
> Key: KAFKA-10651
> URL: https://issues.apache.org/jira/browse/KAFKA-10651
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.0
> Reporter: A. Sophie Blee-Goldman
> Assignee: A. Sophie Blee-Goldman
> Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> In KIP-441, the new HA assignor makes an informed decision about stateful
> task placement based on the offset sums reported by each instance. Offset
> sums are computed one of two ways: for assigned tasks (ie those in the
> TaskManager's "tasks" map), it will just sum up the tasks' changelog offsets
> directly. For tasks that are not assigned but whose directory remains on
> disk, it reads the changelog offsets from the checkpoint file. This is
> encoded with the subscription userdata sent during the JoinGroup phase of a
> rebalance.
> The problem here is that it's possible for the instance to rejoin the group
> after having been assigned a new task, but before that task is initialized.
> In this case it would not compute the offset sum from the checkpoint file but
> instead from the uninitialized task, causing it to skip reporting any offsets
> for that task whatsoever.
> This results in a particularly nefarious interaction between HA and
> cooperative rebalancing. An instance may read from the checkpoint file of a
> caught-up (but unassigned) task and report this in its subscription, leading
> the assignor to compute a small lag and place this task on the instance.
> After placing all stateful tasks in this way, it will distribute the
> stateless tasks across the group to balance the overall workload. It does
> this without considering the previous owner of the stateless tasks, so odds
> are good that moving the stateful task to this instance will result in a
> different assortment of stateless tasks in this rebalance.
> Any time owned tasks are moved around, the current owner will have to revoke
> them and trigger a followup cooperative rebalance. Within the Consumer
> client, this actually happens immediately: that is, within an invocation of
> poll() it will loop inside joinGroupIfNeeded() as long as a rejoin is needed.
> And at the end of the last rebalance, if any partitions are revoked then a
> rejoin will indeed be needed. So the Consumer will send out it's next
> JoinGroup – including the userdata with computed task offset sums – without
> first exiting from the current poll(). Streams never gets the chance to
> initialize its new tasks, and ends up excluding them from the offset sums it
> reports in the following rebalance.
> And since it doesn't report any offsets for this task, the assignor now
> believes the instance does _not_ have any caught up state for this task, and
> assigns the task elsewhere. This causes a shuffling of stateless tasks once
> more, which in turn results in another cooperative rebalance. This time the
> task is no longer assigned so the instance reports offsets based on the
> checkpoint file again, and we're back at the beginning.
> Given the deterministic assignment, once a group is caught up in this cycle
> it will be impossible to escape it without manual intervention.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)