John Karp created FLINK-24639:
---------------------------------

             Summary: Improve assignment of Kinesis shards to subtasks
                 Key: FLINK-24639
                 URL: https://issues.apache.org/jira/browse/FLINK-24639
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kinesis
            Reporter: John Karp
         Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png

The default assigner of Kinesis shards to Flink subtasks simply takes the 
hashCode() of the StreamShardHandle (an integer), which is then interpreted 
modulo the number of subtasks. This basically does random-ish but deterministic 
assignment of shards to subtasks.

However, this can lead to some subtasks getting several times the number of 
shards as others. To prevent those unlucky subtasks from being overloaded, the 
overall Flink cluster must be over-provisioned, so that each subtask has more 
headroom to handle any over-assignment of shards.

We can do better here, at least if Kinesis is being used in a common way. Each 
record sent to a Kinesis stream has a particular hash key in the range [0, 
2^128), which is used to determine which shard gets used; each shard has an 
assigned range of hash keys. By default Kinesis assigns each shard equal 
fractions of the hash-key space. And when you scale up or down using 
UpdateShardCount, it tries to maintain equal fractions to the extent possible. 
Also, a shard's hash key range is fixed at creation; it can only be replaced by 
new shards, which split it, or merge it.

Given the above, one way to assign shards to subtasks is to do a linear mapping 
from hash-keys in range [0, 2^128) to subtask indices in [0, nSubtasks). For 
the 'coordinate' of each shard we pick the middle of the shard's range, to 
ensure neither subtask 0 nor subtask (n-1) is assigned too many.

However this will probably not be helpful for Kinesis users that don't randomly 
assign partition or hash keys to Kinesis records. The existing assigner is 
probably better for them.

I ran a simulation of the default shard assigner versus some alternatives, 
using shards taken from one of our Kinesis streams; results attached. The 
measure I used I call 'overload' and it measures how many times more shards the 
most heavily-loaded subtask has than is necessary. (DEFAULT is the default 
assigner, Sha256 is similar to the default but with a stronger hashing 
function, ShardId extracts the shard number from the shardId and uses that, and 
HashKey is the one I describe above.)

Patch is at: 
https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to