[ 
https://issues.apache.org/jira/browse/FLINK-24639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434329#comment-17434329
 ] 

Martijn Visser commented on FLINK-24639:
----------------------------------------

[~dannycranmer] What's your opinion on this one? 

> Improve assignment of Kinesis shards to subtasks
> ------------------------------------------------
>
>                 Key: FLINK-24639
>                 URL: https://issues.apache.org/jira/browse/FLINK-24639
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>            Reporter: John Karp
>            Priority: Major
>         Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png
>
>
> The default assigner of Kinesis shards to Flink subtasks simply takes the 
> hashCode() of the StreamShardHandle (an integer), which is then interpreted 
> modulo the number of subtasks. This basically does random-ish but 
> deterministic assignment of shards to subtasks.
> However, this can lead to some subtasks getting several times the number of 
> shards as others. To prevent those unlucky subtasks from being overloaded, 
> the overall Flink cluster must be over-provisioned, so that each subtask has 
> more headroom to handle any over-assignment of shards.
> We can do better here, at least if Kinesis is being used in a common way. 
> Each record sent to a Kinesis stream has a particular hash key in the range 
> [0, 2^128), which is used to determine which shard gets used; each shard has 
> an assigned range of hash keys. By default Kinesis assigns each shard equal 
> fractions of the hash-key space. And when you scale up or down using 
> UpdateShardCount, it tries to maintain equal fractions to the extent 
> possible. Also, a shard's hash key range is fixed at creation; it can only be 
> replaced by new shards, which split it, or merge it.
> Given the above, one way to assign shards to subtasks is to do a linear 
> mapping from hash-keys in range [0, 2^128) to subtask indices in [0, 
> nSubtasks). For the 'coordinate' of each shard we pick the middle of the 
> shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too 
> many.
> However this will probably not be helpful for Kinesis users that don't 
> randomly assign partition or hash keys to Kinesis records. The existing 
> assigner is probably better for them.
> I ran a simulation of the default shard assigner versus some alternatives, 
> using shards taken from one of our Kinesis streams; results attached. The 
> measure I used I call 'overload' and it measures how many times more shards 
> the most heavily-loaded subtask has than is necessary. (DEFAULT is the 
> default assigner, Sha256 is similar to the default but with a stronger 
> hashing function, ShardId extracts the shard number from the shardId and uses 
> that, and HashKey is the one I describe above.)
> Patch is at: 
> https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to