Navinder Brar created KAFKA-9169:
------------------------------------
Summary: Standby Tasks point ask for incorrect offsets on resuming
post suspension
Key: KAFKA-9169
URL: https://issues.apache.org/jira/browse/KAFKA-9169
Project: Kafka
Issue Type: New Feature
Components: streams
Reporter: Navinder Brar
In versions(check 2.0) where standby tasks are suspended on each rebalance the
checkpoint file is updated post the flush and the expected behaviour is that
post assignment the same standby task gets assigned back on the machine it will
start reading data from changelog from the same offset from it left off.
But there looks like a bug in the code, every time post rebalance it starts
reading from the offset from where it read the first time the task was assigned
on this machine. This has 2 repercussions:
# After every rebalance the standby tasks start restoring huge amount of data
which they have already restored earlier(Verified this via 300x increase
Network IO on all streams instances post rebalance even when no change in
assignment) .
# If changelog has time retention those offsets will not be available in the
changelog, which leads to offsetOutOfRange exceptions and the stores get
deleted and recreated again.
I have gone through the code and I think I know the issue.
In TaskManager# updateNewAndRestoringTasks(), the function
assignStandbyPartitions() gets called for all the running standby tasks where
it populates the Map: checkpointedOffsets from the
standbyTask.checkpointedOffsets() which is only updated at the time of
initialization of a StandbyTask(i.e. in it's constructor).
This has an easy fix.
Post resumption we are reading standbyTask.checkpointedOffsets() to know the
offset from where the standby task should start running and not from
stateMgr.checkpointed() which gets updated on every commit to the checkpoint
file. In the former case it's always reading from the same offset, even those
which it had already read earlier and in cases where changelog topic has a
retention time, it gives offsetOutOfRange exception. So,
standbyTask.checkpointedOffsets() is quite useless and we should use
stateMgr.checkpointed() instead to return offsets to task manager.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)