Hi Thomas, Thanks a lot for opening the PR. I had a look at it and the comment you left, and left my thoughts there. In general, I think it’s heading towards the right direction.
Cheers, Gordon On 31 January 2018 at 4:03:36 PM, Thomas Weise (t...@apache.org) wrote: Hi Gordon, Can you have a quick look at the PR and the comment I added. That will help to polish it up and make it ready for review. Thanks! Thomas -- sent from mobile ---------- Forwarded message ---------- From: "Thomas Weise" <t...@apache.org> Date: Jan 30, 2018 5:53 PM Subject: Re: Kinesis consumer shard skew - FLINK-8516 To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> Cc: <dev@flink.apache.org> I created a PR for further discussion: https://github.com/apache/flink/pull/5393 There are a few TODOs where I think improvements can be made. Let me know if you agree with the overall direction. Thanks, Thomas On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <t...@apache.org> wrote: What do you mean by checkpoint “assignments”? The assignment from shard-to-source is only fixed within a single execution of the job. We only checkpoint the progress of each shard in the state. Given that we support plugging in custom shard assignment hashing, then the assignment could potentially change every time we restore. If what you mean is actually retaining checkpointed shard state (i.e. the progress sequence number), then: I don’t really see a reason why a user would want to ignore checkpointed shard sequence numbers, but it could really just be my lack of knowledge for possible real user scenarios. Though, ignoring checkpointed shard sequence numbers on restore from a savepoint would immediately break exactly-once guarantees, so if we do have a case for that, we need to be very educative in its use and side effects. At the moment only the shard offsets are saved, and not the subtask association. With "checkpointed assignments" I meant saving which shards belong to which subtask, but that may lead to problems when changing the consumer parallelism. It seems that balanced distribution is hard to achieve without synchronization between subtasks. There is the possibility of subtasks intermittently retrieving different shard lists while resharding occurs. The assignment logic would either need to only consider the shardID and result in skewed distribution (current implementation) or there needs to be a barrier at which each subtask is guaranteed to see the same shard list, which would allow for round-robin distribution. In order to rebase the mapping after resharding, we would probably need all subtasks to agree on the shard list and most recent offsets (distributed consensus) and apply changes at a checkpoint barrier? I really don't see how else we can end up with balanced shard distribution as generic solution. Thanks, Thomas