[ 
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751837#comment-15751837
 ] 

ASF GitHub Bot commented on FLINK-4821:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3001
  
    @tony810430 (cc @StephanEwen, f.y.i.)
    At a second closer look, I'm afraid this PR can't be merged as is. The 
problem is that the state redistribution of `ListCheckpointed` doesn't work 
with the Kinesis consumer's current shard discovery mechanism.
    
    On restore, each subtask uses the restored states it gets to appropriately 
set the "last seen shard ID" of the subtask. With this value set, the subtask 
is able to discover only shards after the "last seen shard ID". Then, the 
subtask determines which of the newly discovered shards it should be 
responsible of consuming, using a simple modulo operation on the shards' hash 
values.
    
    This works before when restored state could not be redistributed, because 
subtasks will always be restored shards which belong to that subtask (i.e. via 
the modulo on hash operation).
    
    The state redistribution on restore for `ListCheckpointed` breaks this. For 
example:
    Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis 
stream has 2 shards:
    subtask #1 --> shard1, shard2.
    
    After a restore with increased parallelism to 2, let's say the list state 
gets redistributed as:
    subtask #1 --> shard1
    subtask #2 --> shard2
    
    Subtask #1's _last seen shard ID_ will be set to shard1, and will therefore 
discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask 
#1, we'll have both subtasks consuming shard2.
    
    Changing the hashing / subtask-to-shard assignment determination for the 
shard discovery probably can't solve the problem, because no matter how we 
change that, it'll still be dependent of what the list state redistribution 
looks like.
    
    The only way I can see in solving this would probably be have merged state 
on restore, so that all subtasks may set the "last seen shard ID" to the 
largest ID across all subtasks, not just the local subtask.
    
    In flip-8 I see the community has discussed an interface for merged state 
also (a unioned list state on restore). I think that will be really useful in 
this particular case here. It'll also be relevant for the Kafka connector, 
right now it seems irrelevant only because the Kafka consumer doesn't have 
partition discovery yet.
    @StefanRRichter could you probably provide some insight on the merged state 
aspect? I'm not that familiar yet with the recent works and progress on the 
repartitionable states.


> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
>                 Key: FLINK-4821
>                 URL: https://issues.apache.org/jira/browse/FLINK-4821
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>             Fix For: 1.2.0
>
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the 
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement 
> it too. This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to