Thanks! I will cleanup the PR and then open it for review. -- sent from mobile
On Jan 31, 2018 7:26 AM, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: > Hi Thomas, > > Thanks a lot for opening the PR. > I had a look at it and the comment you left, and left my thoughts there. > In general, I think it’s heading towards the right direction. > > Cheers, > Gordon > > On 31 January 2018 at 4:03:36 PM, Thomas Weise (t...@apache.org) wrote: > > Hi Gordon, > > Can you have a quick look at the PR and the comment I added. That will > help to polish it up and make it ready for review. > > Thanks! > Thomas > > -- > sent from mobile > ---------- Forwarded message ---------- > From: "Thomas Weise" <t...@apache.org> > Date: Jan 30, 2018 5:53 PM > Subject: Re: Kinesis consumer shard skew - FLINK-8516 > To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> > Cc: <dev@flink.apache.org> > > I created a PR for further discussion: >> >> https://github.com/apache/flink/pull/5393 >> >> There are a few TODOs where I think improvements can be made. Let me know >> if you agree with the overall direction. >> >> Thanks, >> Thomas >> >> >> On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <t...@apache.org> wrote: >> >>> >>>> What do you mean by checkpoint “assignments”? The assignment from >>>> shard-to-source is only fixed within a single execution of the job. We only >>>> checkpoint the progress of each shard in the state. >>>> Given that we support plugging in custom shard assignment hashing, then >>>> the assignment could potentially change every time we restore. >>>> >>>> If what you mean is actually retaining checkpointed shard state (i.e. >>>> the progress sequence number), then: >>>> I don’t really see a reason why a user would want to ignore >>>> checkpointed shard sequence numbers, but it could really just be my lack of >>>> knowledge for possible real user scenarios. >>>> Though, ignoring checkpointed shard sequence numbers on restore from a >>>> savepoint would immediately break exactly-once guarantees, so if we do have >>>> a case for that, we need to be very educative in its use and side effects. >>>> >>>> >>> At the moment only the shard offsets are saved, and not the subtask >>> association. With "checkpointed assignments" I meant saving which shards >>> belong to which subtask, but that may lead to problems when changing the >>> consumer parallelism. >>> >>> It seems that balanced distribution is hard to achieve without >>> synchronization between subtasks. There is the possibility of subtasks >>> intermittently retrieving different shard lists while resharding occurs. >>> The assignment logic would either need to only consider the shardID and >>> result in skewed distribution (current implementation) or there needs to be >>> a barrier at which each subtask is guaranteed to see the same shard list, >>> which would allow for round-robin distribution. >>> >>> In order to rebase the mapping after resharding, we would probably need >>> all subtasks to agree on the shard list and most recent offsets >>> (distributed consensus) and apply changes at a checkpoint barrier? I really >>> don't see how else we can end up with balanced shard distribution as >>> generic solution. >>> >>> Thanks, >>> Thomas >>> >>> >>