Hello Navinder,

Sorry for the late reply and thanks for bringing this up. I think this is
indeed a bug that needs to be fixed.

The rationale behind was the following: for restoring active tasks and
processing standby tasks, we are using the same consumer client within the
thread (the restoreConsumer). And before ALL of the active tasks have
completed restoration, the consumer would not get assigned to any of the
standby tasks at all. So in a timeline it should be looking like this with
a rebalance assuming KIP-429 is already in place:

T0: rebalance triggered, some tasks gets revoked but some others may still
be active;
T0-T1: a subset of active tasks (via the main consumer) and all standby
tasks (via the restore consumer) are still processing;
T1: rebalance finished, some new tasks gets assigned, and now needs to be
restored. Restore consumer re-assign to fetch from those restoring consumer
only.
T1-T2: the main consumer paused all partitions, hence no active tasks
processing; also restore consumer only fetching for restoring tasks, and
hence no standby tasks processing;
T2: restoration completed, restore consumer reassigned to those standby
tasks.

Note in T1, the standby tasks are all still "running" but they just do not
proceed any more since the consumer has switched to fetch other partitions;
so at T2 when the consumer switch back it should just resume from where it
has switched off.


Guozhang


On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar
<navinder_b...@yahoo.com.invalid> wrote:

> Hi,
> Please let me know if this is not the correct forum to ask this. But I
> have a doubt, I was hoping someone can clear it for me.
> In TaskManager:: updateNewAndRestoringTasks(), the
> function assignStandbyPartitions() gets called for all the running standby
> tasks where it populates the Map: checkpointedOffsets from the
> standbyTask.checkpointedOffsets() which is only updated at the time of
> initialization of a StandbyTask(i.e. in it's constructor). I have checked
> and this goes way to 1.1 version when the rebalance protocol was old and
> standby tasks were suspended during rebalance and then resumed on
> assignment.
> I want to know, why post resumption we were/are reading
> standbyTask.checkpointedOffsets() to know the offset from where the standby
> task should start running and not from stateMgr.checkpointed() which gets
> updated on every commit to the checkpoint file. In the former case it's
> always reading from the same offset, even those which it had already read
> earlier and in cases where changelog topic has a retention time, it gives
> offsetOutOfRange exception.
> Regards,
> Navinder



-- 
-- Guozhang

Reply via email to