[ https://issues.apache.org/jira/browse/FLINK-24639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434329#comment-17434329 ]
Martijn Visser commented on FLINK-24639: ---------------------------------------- [~dannycranmer] What's your opinion on this one? > Improve assignment of Kinesis shards to subtasks > ------------------------------------------------ > > Key: FLINK-24639 > URL: https://issues.apache.org/jira/browse/FLINK-24639 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis > Reporter: John Karp > Priority: Major > Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png > > > The default assigner of Kinesis shards to Flink subtasks simply takes the > hashCode() of the StreamShardHandle (an integer), which is then interpreted > modulo the number of subtasks. This basically does random-ish but > deterministic assignment of shards to subtasks. > However, this can lead to some subtasks getting several times the number of > shards as others. To prevent those unlucky subtasks from being overloaded, > the overall Flink cluster must be over-provisioned, so that each subtask has > more headroom to handle any over-assignment of shards. > We can do better here, at least if Kinesis is being used in a common way. > Each record sent to a Kinesis stream has a particular hash key in the range > [0, 2^128), which is used to determine which shard gets used; each shard has > an assigned range of hash keys. By default Kinesis assigns each shard equal > fractions of the hash-key space. And when you scale up or down using > UpdateShardCount, it tries to maintain equal fractions to the extent > possible. Also, a shard's hash key range is fixed at creation; it can only be > replaced by new shards, which split it, or merge it. > Given the above, one way to assign shards to subtasks is to do a linear > mapping from hash-keys in range [0, 2^128) to subtask indices in [0, > nSubtasks). For the 'coordinate' of each shard we pick the middle of the > shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too > many. > However this will probably not be helpful for Kinesis users that don't > randomly assign partition or hash keys to Kinesis records. The existing > assigner is probably better for them. > I ran a simulation of the default shard assigner versus some alternatives, > using shards taken from one of our Kinesis streams; results attached. The > measure I used I call 'overload' and it measures how many times more shards > the most heavily-loaded subtask has than is necessary. (DEFAULT is the > default assigner, Sha256 is similar to the default but with a stronger > hashing function, ShardId extracts the shard number from the shardId and uses > that, and HashKey is the one I describe above.) > Patch is at: > https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)