Thank you Schwalbe, David and Naitong for your answers! *David*: This is what we're currently doing ATM, and I wanted to know if there's any simplified approach to this. This is what we have so far: https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b *Naitong*: The keyBy internally will rehash the key you provide it. How do you make sure that the re-hashed key is still in the desired key group range? *Schwalbe*:
- Assuming that all your 4 different keys are evenly distributed, and you send them to (only) 3 buckets, you would expect at least one bucket to cover 2 of your keys, hence the 50% - You're right, this is the desire behavior I actually want, I don't want them to be really uniformly distributed as I want to batch multiple keys together in the same bucket. - With low entropy keys avoiding data skew is quite difficult - I understand, and we are well aware of the implications. - But your situation could be worse, all 4 keys could end up in the same bucket, if the hash function in use happens to generate collisions for the 4 keys, in which case 2 of your 3 buckets would not process any events … this could also lead to watermarks not progressing … - We take care of this internally as we understand there may be skewing to the buckets. I don't care about watermarks at this stage. - There is two proposal on how to improve the situation: - Use the same parallelism and max parallelism for the relevant operators and implement a manual partitioner - A manual partitioner is also good in situations where you want to lower the bias and you exactly know the distribution of your key space and rearrange keys to even-out numbers - I looked into custom partitioning, but it seems to not work with KeyedDataStream, and I need the distribution to be performed when keying the stream. - More sophisticated (if possible), divide-and-conquer like - Interesting idea, but I'm not sure I follow. Could you possibly provide a sketch of the transformations on the stream? - Key by your ‘small’ key plus soma arbitrary attribute with higher entropy - Window aggregate first on that artificial key - Aggregate the results on your original ‘small’ key - This could be interesting for high-throughput situation where you actually want to run in parallelism higher than the number of different ‘small’ keys On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao <xiaonait...@gmail.com> wrote: > I think I had a similar scenario several months ago, here is my related > code: > > val MAX_PARALLELISM = 16 > val KEY_RAND_SALT = “73b46” > > logSource.keyBy{ value => > val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, > MAX_PARALLELISM) > s"$KEY_RAND_SALT$keyGroup" > } > > The keyGroup is just like your bucket id, and the KEY_RAND_SALT was > generated by some script to map bucket id evenly to operators under the max > parallelism. > > Sent with a Spark <https://sparkmailapp.com/source?from=signature> > On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <yuva...@gmail.com>, wrote: > > Hi, > I have a use-case where I'd like to partition a KeyedDataStream a bit > differently than how Flinks default partitioning works with key groups. > > <image.png> > What I'd like to be able to do is take all my data and split it up evenly > between 3 buckets which will store the data in the state. Using the key > above works, but splits the data unevenly between the different key groups, > as usually the key space is very small (0 - 3). What ends up happening is > that sometimes 50% of the keys end up on the same operator index, where > ideally I'd like to distribute it evenly between all operator indexes in > the cluster. > > Is there any way of doing this? > -- > Best Regards, > Yuval Itzchakov. > > -- Best Regards, Yuval Itzchakov.