[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751837#comment-15751837 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3001 @tony810430 (cc @StephanEwen, f.y.i.) At a second closer look, I'm afraid this PR can't be merged as is. The problem is that the state redistribution of `ListCheckpointed` doesn't work with the Kinesis consumer's current shard discovery mechanism. On restore, each subtask uses the restored states it gets to appropriately set the "last seen shard ID" of the subtask. With this value set, the subtask is able to discover only shards after the "last seen shard ID". Then, the subtask determines which of the newly discovered shards it should be responsible of consuming, using a simple modulo operation on the shards' hash values. This works before when restored state could not be redistributed, because subtasks will always be restored shards which belong to that subtask (i.e. via the modulo on hash operation). The state redistribution on restore for `ListCheckpointed` breaks this. For example: Job starts with only 1 subtask for FlinkKinesisConsumer, and the Kinesis stream has 2 shards: subtask #1 --> shard1, shard2. After a restore with increased parallelism to 2, let's say the list state gets redistributed as: subtask #1 --> shard1 subtask #2 --> shard2 Subtask #1's _last seen shard ID_ will be set to shard1, and will therefore discover shard2 as a new shard afterwards. If the shard2 gets hashed to subtask #1, we'll have both subtasks consuming shard2. Changing the hashing / subtask-to-shard assignment determination for the shard discovery probably can't solve the problem, because no matter how we change that, it'll still be dependent of what the list state redistribution looks like. The only way I can see in solving this would probably be have merged state on restore, so that all subtasks may set the "last seen shard ID" to the largest ID across all subtasks, not just the local subtask. In flip-8 I see the community has discussed an interface for merged state also (a unioned list state on restore). I think that will be really useful in this particular case here. It'll also be relevant for the Kafka connector, right now it seems irrelevant only because the Kafka consumer doesn't have partition discovery yet. @StefanRRichter could you probably provide some insight on the merged state aspect? I'm not that familiar yet with the recent works and progress on the repartitionable states. > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)