I created a PR for further discussion: https://github.com/apache/flink/pull/5393
There are a few TODOs where I think improvements can be made. Let me know if you agree with the overall direction. Thanks, Thomas On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <t...@apache.org> wrote: > >> What do you mean by checkpoint “assignments”? The assignment from >> shard-to-source is only fixed within a single execution of the job. We only >> checkpoint the progress of each shard in the state. >> Given that we support plugging in custom shard assignment hashing, then >> the assignment could potentially change every time we restore. >> >> If what you mean is actually retaining checkpointed shard state (i.e. the >> progress sequence number), then: >> I don’t really see a reason why a user would want to ignore checkpointed >> shard sequence numbers, but it could really just be my lack of knowledge >> for possible real user scenarios. >> Though, ignoring checkpointed shard sequence numbers on restore from a >> savepoint would immediately break exactly-once guarantees, so if we do have >> a case for that, we need to be very educative in its use and side effects. >> >> > At the moment only the shard offsets are saved, and not the subtask > association. With "checkpointed assignments" I meant saving which shards > belong to which subtask, but that may lead to problems when changing the > consumer parallelism. > > It seems that balanced distribution is hard to achieve without > synchronization between subtasks. There is the possibility of subtasks > intermittently retrieving different shard lists while resharding occurs. > The assignment logic would either need to only consider the shardID and > result in skewed distribution (current implementation) or there needs to be > a barrier at which each subtask is guaranteed to see the same shard list, > which would allow for round-robin distribution. > > In order to rebase the mapping after resharding, we would probably need > all subtasks to agree on the shard list and most recent offsets > (distributed consensus) and apply changes at a checkpoint barrier? I really > don't see how else we can end up with balanced shard distribution as > generic solution. > > Thanks, > Thomas > >