[
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sabit updated KAFKA-16025:
--------------------------
Description:
Hello,
We are encountering an issue where during rebalancing, we see streams threads
on one client get stuck in rebalancing. Upon enabling debug logs, we saw that
some tasks were having issues initializing due to failure to grab a lock in the
StateDirectory:
{{2023-12-14 22:51:57.352000Z stream-thread
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed
to lock the state directory for task 0_51; will retry}}
We were able to reproduce this behavior reliably on 3.4.0. This is the sequence
that triggers the bug.
Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), each
with 5 threads (1-5), and the consumer is using stateful tasks which have state
stores on disk. There are 10 active tasks and 10 standby tasks.
# Instance A is deactivated
# As an example, lets say task 0_1, previously on instance B, moves to
instance C
# Task 0_1 leaves behind it's state directory on Instance B's disk, currently
unused, and no lock for it exists in Instance B's StateDirectory in-memory lock
tracker
# Instance A is re-activated
# Streams thread 1 on Instance B is asked to re-join the consumer group due to
a new member being added
# As part of re-joining, thread 1 lists non-empty state directories in order
to report the offset's it has in it's state stores as part of it's metadata.
Thread 1 sees that the directory for 0_1 is not empty.
# The cleanup thread on instance B runs. The cleanup thread locks state store
0_1, sees the directory for 0_1 was last modified more than
`state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
# Thread 1 takes a lock on directory 0_1 due to it being found not-empty
before, unaware that the cleanup has run between the time of the check and the
lock. It tracks this lock in it's own in-memory store, in addition to
StateDirectory's in-memory lock store
# Thread 1 successfully joins the consumer group
# After every consumer in the group joins the group, assignments are
calculated, and then every consumer calls sync group to receive the new
assignments
# Thread 1 on Instance B calls sync group but gets an error - the group
coordinator has triggered a new rebalance and all members must rejoin the group
# Thread 1 again lists non-empty state directories in order to report the
offset's it has in it's state stores as part of it's metadata. Prior to doing
so, it clears it's in-memory store tracking the locks it has taken for the
purpose of gathering rebalance metadata
# Thread 1 no longer takes a lock on 0_1 as it is empty
# However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
# All consumers re-join and sync successfully, receiving their new assignments
# Thread 2 on Instance B is assigned task 0_1
# Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is still
being held by Thread 1
# Thread 2 remains in rebalancing state, and cannot make progress on task 0_1,
or any other tasks it has assigned.
was:
Hello,
We are encountering an issue where during rebalancing, we see streams threads
on one client get stuck in rebalancing. Upon enabling debug logs, we saw that
some tasks were having issues initializing due to failure to grab a lock in the
StateDirectory:
{{2023-12-14 22:51:57.352000Z stream-thread
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed
to lock the state directory for task 0_51; will retry}}
Reading the comments for TaskManager, this seems like an expected case, however
in our situation, the thread would get stuck on this permanently. After looking
at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107,
0_219, or 0_93 were being assigned to this client, the assigned threads would
get stuck due to being unable to grab the lock. We took a heap dump of this JVM
and found that all of these tasks were being locks by StreamThread-21 (see
attachment). Additionally, each of these task directories exist on the client
but are empty directories.
The sequence of events that occurred for us to arrive at this state is that
initially, all of these tasks were being processed on the impacted client,
either as active or standby tasks. We had one client drop out of the consumer
group, so these tasks were rebalanced away from the client. When we try to
bring up a new client to replace the one that dropped out, the impacted client
cannot initialize these 5 tasks it was initially processing. Sample of one
timeline:
{{# Task moved away from the original consumer thread}}
{{2023-12-13 22:45:58.240000Z stream-thread
[i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}}
{{2023-12-13 22:45:58.263000Z stream-thread
[i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}}
{{# Directory cleaned up}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread]
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed
(cleanup delay is 600000ms).}}
{{# Cannot initialize task when it is re-assigned to this client}}
{{2023-12-14 22:51:57.352000Z stream-thread
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed
to lock the state directory for task 0_51; will retry}}
{{Reading through the StateDirectory, it wasn't immediately obvious how we
could arrive in a situation where the task is locked by a thread it hadn't been
attempted to be assigned to yet, while the directory was cleaned up, but is now
empty instead of being deleted. We didn't observe any filesystem issues on this
client around this time either.}}
> Streams StateDirectory has orphaned locks after rebalancing, blocking future
> rebalancing
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.4.0
> Environment: Linux
> Reporter: Sabit
> Priority: Major
> Attachments: Screenshot 1702750558363.png
>
>
> Hello,
>
> We are encountering an issue where during rebalancing, we see streams threads
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that
> some tasks were having issues initializing due to failure to grab a lock in
> the StateDirectory:
>
> {{2023-12-14 22:51:57.352000Z stream-thread
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since:
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51]
> Failed to lock the state directory for task 0_51; will retry}}
>
> We were able to reproduce this behavior reliably on 3.4.0. This is the
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E),
> each with 5 threads (1-5), and the consumer is using stateful tasks which
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
> # Instance A is deactivated
> # As an example, lets say task 0_1, previously on instance B, moves to
> instance C
> # Task 0_1 leaves behind it's state directory on Instance B's disk,
> currently unused, and no lock for it exists in Instance B's StateDirectory
> in-memory lock tracker
> # Instance A is re-activated
> # Streams thread 1 on Instance B is asked to re-join the consumer group due
> to a new member being added
> # As part of re-joining, thread 1 lists non-empty state directories in order
> to report the offset's it has in it's state stores as part of it's metadata.
> Thread 1 sees that the directory for 0_1 is not empty.
> # The cleanup thread on instance B runs. The cleanup thread locks state
> store 0_1, sees the directory for 0_1 was last modified more than
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
> # Thread 1 takes a lock on directory 0_1 due to it being found not-empty
> before, unaware that the cleanup has run between the time of the check and
> the lock. It tracks this lock in it's own in-memory store, in addition to
> StateDirectory's in-memory lock store
> # Thread 1 successfully joins the consumer group
> # After every consumer in the group joins the group, assignments are
> calculated, and then every consumer calls sync group to receive the new
> assignments
> # Thread 1 on Instance B calls sync group but gets an error - the group
> coordinator has triggered a new rebalance and all members must rejoin the
> group
> # Thread 1 again lists non-empty state directories in order to report the
> offset's it has in it's state stores as part of it's metadata. Prior to doing
> so, it clears it's in-memory store tracking the locks it has taken for the
> purpose of gathering rebalance metadata
> # Thread 1 no longer takes a lock on 0_1 as it is empty
> # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
> # All consumers re-join and sync successfully, receiving their new
> assignments
> # Thread 2 on Instance B is assigned task 0_1
> # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is
> still being held by Thread 1
> # Thread 2 remains in rebalancing state, and cannot make progress on task
> 0_1, or any other tasks it has assigned.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)