[ https://issues.apache.org/jira/browse/FLINK-24639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435098#comment-17435098 ]
John Karp commented on FLINK-24639: ----------------------------------- Thanks [~dannycranmer], I opened https://github.com/apache/flink/pull/17583 Regarding the multi-stream case, if the streams have identical numbers of shards, and the shard ranges are identical, then they will have the same distribution. For example, if there are two subtasks, and two single-shard streams, both shards will be assigned to the same subtask. (The default assigner has a similar issue - the hash doesn't include region or account number, so if you have identically-named streams in multiple regions, you can also end up with non-independent assignments as well.) I think the situation can be improved in the UniformShardAssigner though - the index it computes can be added with the hash of streamName/region/account) > Improve assignment of Kinesis shards to subtasks > ------------------------------------------------ > > Key: FLINK-24639 > URL: https://issues.apache.org/jira/browse/FLINK-24639 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis > Reporter: John Karp > Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png > > > The default assigner of Kinesis shards to Flink subtasks simply takes the > hashCode() of the StreamShardHandle (an integer), which is then interpreted > modulo the number of subtasks. This basically does random-ish but > deterministic assignment of shards to subtasks. > However, this can lead to some subtasks getting several times the number of > shards as others. To prevent those unlucky subtasks from being overloaded, > the overall Flink cluster must be over-provisioned, so that each subtask has > more headroom to handle any over-assignment of shards. > We can do better here, at least if Kinesis is being used in a common way. > Each record sent to a Kinesis stream has a particular hash key in the range > [0, 2^128), which is used to determine which shard gets used; each shard has > an assigned range of hash keys. By default Kinesis assigns each shard equal > fractions of the hash-key space. And when you scale up or down using > UpdateShardCount, it tries to maintain equal fractions to the extent > possible. Also, a shard's hash key range is fixed at creation; it can only be > replaced by new shards, which split it, or merge it. > Given the above, one way to assign shards to subtasks is to do a linear > mapping from hash-keys in range [0, 2^128) to subtask indices in [0, > nSubtasks). For the 'coordinate' of each shard we pick the middle of the > shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too > many. > However this will probably not be helpful for Kinesis users that don't > randomly assign partition or hash keys to Kinesis records. The existing > assigner is probably better for them. > I ran a simulation of the default shard assigner versus some alternatives, > using shards taken from one of our Kinesis streams; results attached. The > measure I used I call 'overload' and it measures how many times more shards > the most heavily-loaded subtask has than is necessary. (DEFAULT is the > default assigner, Sha256 is similar to the default but with a stronger > hashing function, ShardId extracts the shard number from the shardId and uses > that, and HashKey is the one I describe above.) > Patch is at: > https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)