I think I had a similar scenario several months ago, here is my related code:
val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”
logSource.keyBy{ value =>
val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
MAX_PARALLELISM)
s"$KEY_RAND_SALT$keyGroup"
}
The keyGroup is just like your bucket id, and the KEY_RAND_SALT was generated
by some script to map bucket id evenly to operators under the max parallelism.
Sent with a Spark
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <[email protected]>, wrote:
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
> <image.png>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key above
> works, but splits the data unevenly between the different key groups, as
> usually the key space is very small (0 - 3). What ends up happening is that
> sometimes 50% of the keys end up on the same operator index, where ideally
> I'd like to distribute it evenly between all operator indexes in the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.