Hi Thomas,

Thanks a lot for opening the PR.
I had a look at it and the comment you left, and left my thoughts there. In 
general, I think it’s heading towards the right direction.

Cheers,
Gordon

On 31 January 2018 at 4:03:36 PM, Thomas Weise (t...@apache.org) wrote:

Hi Gordon,

Can you have a quick look at the PR and the comment I added. That will help to 
polish it up and make it ready for review.

Thanks!
Thomas

--
sent from mobile
---------- Forwarded message ----------
From: "Thomas Weise" <t...@apache.org>
Date: Jan 30, 2018 5:53 PM
Subject: Re: Kinesis consumer shard skew - FLINK-8516
To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Cc: <dev@flink.apache.org>

I created a PR for further discussion:

https://github.com/apache/flink/pull/5393

There are a few TODOs where I think improvements can be made. Let me know if 
you agree with the overall direction.

Thanks,
Thomas


On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <t...@apache.org> wrote:

What do you mean by checkpoint “assignments”? The assignment from 
shard-to-source is only fixed within a single execution of the job. We only 
checkpoint the progress of each shard in the state.
Given that we support plugging in custom shard assignment hashing, then the 
assignment could potentially change every time we restore.

If what you mean is actually retaining checkpointed shard state (i.e. the 
progress sequence number), then:
I don’t really see a reason why a user would want to ignore checkpointed shard 
sequence numbers, but it could really just be my lack of knowledge for possible 
real user scenarios.
Though, ignoring checkpointed shard sequence numbers on restore from a 
savepoint would immediately break exactly-once guarantees, so if we do have a 
case for that, we need to be very educative in its use and side effects.


At the moment only the shard offsets are saved, and not the subtask 
association. With "checkpointed assignments" I meant saving which shards belong 
to which subtask, but that may lead to problems when changing the consumer 
parallelism.

It seems that balanced distribution is hard to achieve without synchronization 
between subtasks. There is the possibility of subtasks intermittently 
retrieving different shard lists while resharding occurs. The assignment logic 
would either need to only consider the shardID and result in skewed 
distribution (current implementation) or there needs to be a barrier at which 
each subtask is guaranteed to see the same shard list, which would allow for 
round-robin distribution.

In order to rebase the mapping after resharding, we would probably need all 
subtasks to agree on the shard list and most recent offsets (distributed 
consensus) and apply changes at a checkpoint barrier? I really don't see how 
else we can end up with balanced shard distribution as generic solution.

Thanks,
Thomas


Reply via email to