Thanks! I will cleanup the PR and then open it for review.

--
sent from mobile

On Jan 31, 2018 7:26 AM, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:

> Hi Thomas,
>
> Thanks a lot for opening the PR.
> I had a look at it and the comment you left, and left my thoughts there.
> In general, I think it’s heading towards the right direction.
>
> Cheers,
> Gordon
>
> On 31 January 2018 at 4:03:36 PM, Thomas Weise (t...@apache.org) wrote:
>
> Hi Gordon,
>
> Can you have a quick look at the PR and the comment I added. That will
> help to polish it up and make it ready for review.
>
> Thanks!
> Thomas
>
> --
> sent from mobile
> ---------- Forwarded message ----------
> From: "Thomas Weise" <t...@apache.org>
> Date: Jan 30, 2018 5:53 PM
> Subject: Re: Kinesis consumer shard skew - FLINK-8516
> To: "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
> Cc: <dev@flink.apache.org>
>
> I created a PR for further discussion:
>>
>> https://github.com/apache/flink/pull/5393
>>
>> There are a few TODOs where I think improvements can be made. Let me know
>> if you agree with the overall direction.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <t...@apache.org> wrote:
>>
>>>
>>>> What do you mean by checkpoint “assignments”? The assignment from
>>>> shard-to-source is only fixed within a single execution of the job. We only
>>>> checkpoint the progress of each shard in the state.
>>>> Given that we support plugging in custom shard assignment hashing, then
>>>> the assignment could potentially change every time we restore.
>>>>
>>>> If what you mean is actually retaining checkpointed shard state (i.e.
>>>> the progress sequence number), then:
>>>> I don’t really see a reason why a user would want to ignore
>>>> checkpointed shard sequence numbers, but it could really just be my lack of
>>>> knowledge for possible real user scenarios.
>>>> Though, ignoring checkpointed shard sequence numbers on restore from a
>>>> savepoint would immediately break exactly-once guarantees, so if we do have
>>>> a case for that, we need to be very educative in its use and side effects.
>>>>
>>>>
>>> At the moment only the shard offsets are saved, and not the subtask
>>> association. With "checkpointed assignments" I meant saving which shards
>>> belong to which subtask, but that may lead to problems when changing the
>>> consumer parallelism.
>>>
>>> It seems that balanced distribution is hard to achieve without
>>> synchronization between subtasks. There is the possibility of subtasks
>>> intermittently retrieving different shard lists while resharding occurs.
>>> The assignment logic would either need to only consider the shardID and
>>> result in skewed distribution (current implementation) or there needs to be
>>> a barrier at which each subtask is guaranteed to see the same shard list,
>>> which would allow for round-robin distribution.
>>>
>>> In order to rebase the mapping after resharding, we would probably need
>>> all subtasks to agree on the shard list and most recent offsets
>>> (distributed consensus) and apply changes at a checkpoint barrier? I really
>>> don't see how else we can end up with balanced shard distribution as
>>> generic solution.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>

Reply via email to