I think I had a similar scenario several months ago, here is my related code:

val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”

logSource.keyBy{ value =>
 val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, 
MAX_PARALLELISM)
 s"$KEY_RAND_SALT$keyGroup"
}

The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was generated 
by some script to map bucket id evenly to operators under the max parallelism.

Sent with a Spark
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <yuva...@gmail.com>, wrote:
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit 
> differently than how Flinks default partitioning works with key groups.
>
> <image.png>
> What I'd like to be able to do is take all my data and split it up evenly 
> between 3 buckets which will store the data in the state. Using the key above 
> works, but splits the data unevenly between the different key groups, as 
> usually the key space is very small (0 - 3). What ends up happening is that 
> sometimes 50% of the keys end up on the same operator index, where ideally 
> I'd like to distribute it evenly between all operator indexes in the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.

Reply via email to