John Karp created FLINK-24639:
---------------------------------
Summary: Improve assignment of Kinesis shards to subtasks
Key: FLINK-24639
URL: https://issues.apache.org/jira/browse/FLINK-24639
Project: Flink
Issue Type: Improvement
Components: Connectors / Kinesis
Reporter: John Karp
Attachments: Screen Shot 2021-10-25 at 5.11.29 PM.png
The default assigner of Kinesis shards to Flink subtasks simply takes the
hashCode() of the StreamShardHandle (an integer), which is then interpreted
modulo the number of subtasks. This basically does random-ish but deterministic
assignment of shards to subtasks.
However, this can lead to some subtasks getting several times the number of
shards as others. To prevent those unlucky subtasks from being overloaded, the
overall Flink cluster must be over-provisioned, so that each subtask has more
headroom to handle any over-assignment of shards.
We can do better here, at least if Kinesis is being used in a common way. Each
record sent to a Kinesis stream has a particular hash key in the range [0,
2^128), which is used to determine which shard gets used; each shard has an
assigned range of hash keys. By default Kinesis assigns each shard equal
fractions of the hash-key space. And when you scale up or down using
UpdateShardCount, it tries to maintain equal fractions to the extent possible.
Also, a shard's hash key range is fixed at creation; it can only be replaced by
new shards, which split it, or merge it.
Given the above, one way to assign shards to subtasks is to do a linear mapping
from hash-keys in range [0, 2^128) to subtask indices in [0, nSubtasks). For
the 'coordinate' of each shard we pick the middle of the shard's range, to
ensure neither subtask 0 nor subtask (n-1) is assigned too many.
However this will probably not be helpful for Kinesis users that don't randomly
assign partition or hash keys to Kinesis records. The existing assigner is
probably better for them.
I ran a simulation of the default shard assigner versus some alternatives,
using shards taken from one of our Kinesis streams; results attached. The
measure I used I call 'overload' and it measures how many times more shards the
most heavily-loaded subtask has than is necessary. (DEFAULT is the default
assigner, Sha256 is similar to the default but with a stronger hashing
function, ShardId extracts the shard number from the shardId and uses that, and
HashKey is the one I describe above.)
Patch is at:
https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1
--
This message was sent by Atlassian Jira
(v8.3.4#803005)