>
>
> What do you mean by checkpoint “assignments”? The assignment from
> shard-to-source is only fixed within a single execution of the job. We only
> checkpoint the progress of each shard in the state.
> Given that we support plugging in custom shard assignment hashing, then
> the assignment could potentially change every time we restore.
>
> If what you mean is actually retaining checkpointed shard state (i.e. the
> progress sequence number), then:
> I don’t really see a reason why a user would want to ignore checkpointed
> shard sequence numbers, but it could really just be my lack of knowledge
> for possible real user scenarios.
> Though, ignoring checkpointed shard sequence numbers on restore from a
> savepoint would immediately break exactly-once guarantees, so if we do have
> a case for that, we need to be very educative in its use and side effects.
>
>
At the moment only the shard offsets are saved, and not the subtask
association. With "checkpointed assignments" I meant saving which shards
belong to which subtask, but that may lead to problems when changing the
consumer parallelism.

It seems that balanced distribution is hard to achieve without
synchronization between subtasks. There is the possibility of subtasks
intermittently retrieving different shard lists while resharding occurs.
The assignment logic would either need to only consider the shardID and
result in skewed distribution (current implementation) or there needs to be
a barrier at which each subtask is guaranteed to see the same shard list,
which would allow for round-robin distribution.

In order to rebase the mapping after resharding, we would probably need all
subtasks to agree on the shard list and most recent offsets (distributed
consensus) and apply changes at a checkpoint barrier? I really don't see
how else we can end up with balanced shard distribution as generic solution.

Thanks,
Thomas

Reply via email to