[ https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler resolved KAFKA-9169. --------------------------------- Resolution: Fixed This was fixed in https://github.com/apache/kafka/pull/7681/ and merged to trunk (currently 2.5.0-SNAPSHOT) > Standby Tasks point ask for incorrect offsets on resuming post suspension > ------------------------------------------------------------------------- > > Key: KAFKA-9169 > URL: https://issues.apache.org/jira/browse/KAFKA-9169 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2 > Reporter: Navinder Brar > Assignee: John Roesler > Priority: Critical > Fix For: 2.5.0 > > > In versions(check 2.0) where standby tasks are suspended on each rebalance > the checkpoint file is updated post the flush and the expected behaviour is > that post assignment the same standby task gets assigned back on the machine > it will start reading data from changelog from the same offset from it left > off. > > But there looks like a bug in the code, every time post rebalance it starts > reading from the offset from where it read the first time the task was > assigned on this machine. This has 2 repercussions: > # After every rebalance the standby tasks start restoring huge amount of > data which they have already restored earlier(Verified this via 300x increase > Network IO on all streams instances post rebalance even when no change in > assignment) . > # If changelog has time retention those offsets will not be available in the > changelog, which leads to offsetOutOfRange exceptions and the stores get > deleted and recreated again. > > I have gone through the code and I think I know the issue. > In TaskManager# updateNewAndRestoringTasks(), the function > assignStandbyPartitions() gets called for all the running standby tasks where > it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). > > This has an easy fix. > Post resumption we are reading standbyTask.checkpointedOffsets() to know the > offset from where the standby task should start running and not from > stateMgr.checkpointed() which gets updated on every commit to the checkpoint > file. In the former case it's always reading from the same offset, even those > which it had already read earlier and in cases where changelog topic has a > retention time, it gives offsetOutOfRange exception. So, > standbyTask.checkpointedOffsets() is quite useless and we should use > stateMgr.checkpointed() instead to return offsets to task manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)