I think I had a similar scenario several months ago, here is my related code:
val MAX_PARALLELISM = 16 val KEY_RAND_SALT = “73b46” logSource.keyBy{ value => val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, MAX_PARALLELISM) s"$KEY_RAND_SALT$keyGroup" } The keyGroup is just like your bucket id, and the KEY_RAND_SALT was generated by some script to map bucket id evenly to operators under the max parallelism. Sent with a Spark On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <yuva...@gmail.com>, wrote: > Hi, > I have a use-case where I'd like to partition a KeyedDataStream a bit > differently than how Flinks default partitioning works with key groups. > > <image.png> > What I'd like to be able to do is take all my data and split it up evenly > between 3 buckets which will store the data in the state. Using the key above > works, but splits the data unevenly between the different key groups, as > usually the key space is very small (0 - 3). What ends up happening is that > sometimes 50% of the keys end up on the same operator index, where ideally > I'd like to distribute it evenly between all operator indexes in the cluster. > > Is there any way of doing this? > -- > Best Regards, > Yuval Itzchakov.