Hi Thomas, Yes, you are right that sorting and then assigning shard-subtask mappings would not have deterministic assignment. Non-deterministic assignments would cause issues when restoring the consumer state.
Regarding centralizing shard assignment: there actually has been ongoing discussion about centralizing shard / partition discovery, which should also be able to resolve this issue. For now, a general solution would be to move away from distributing shards over subtasks in a round-robin fashion, but just use simple uniform hashing. This would avoid serious skew in specific Kinesis rescaling scenarios compared to the current solution, but in cases where Kinesis streams weren’t sharded at all, we would maybe not have a perfect distribution. Any opinions on this? Would it be acceptable to make changes to the existing operator that make the shard assignment logic and hashing easier to customize? Making the assignment logic pluggable actually sounds like a good overall solution. The shard state in the FlinkKinesisConsumer is a Union list state, meaning that all consumer subtasks will see all shard states on restore. This should allow us to have different shard assignment logic when restoring. Cc’ing also Aljoscha on this topic. Cheers, Gordon On 26 January 2018 at 5:47:05 PM, Thomas Weise (t...@apache.org) wrote: Hi, We notice the uneven distribution of shards over subtasks after re-sharding. We believe that our use case can be addressed by sorting shards and assigning them to subtasks by index, with caveats. The main problem will be that the shard-subtask mapping won't be deterministic, while current hash based solution is (but causes skew). Possibly the trade-off will be difficult to overcome (for a generalized solution) without centralizing the shard assignment, which would in turn require something like side inputs. Any opinions on this? Would it be acceptable to make changes to the existing operator that make the shard assignment logic and hashing easier to customize? Thanks, Thomas