Another possibility, if you know in advance the values of the keys, is to find a mapping that transforms the original keys into new keys that will, in fact, end up in disjoint key groups that will, in turn, be assigned to different slots (given a specific parallelism). This is ugly, but feasible.
For reference, the key group for a given key is MathUtils.murmurHash(key.hashCode()) % maxParallelism and a given key group will be assigned to the slot computed by keyGroup * actualParallelism / maxParallelism David On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Yuval, > > > > Just a couple of comments: > > > > - Assuming that all your 4 different keys are evenly distributed, and > you send them to (only) 3 buckets, you would expect at least one bucket to > cover 2 of your keys, hence the 50% > - With low entropy keys avoiding data skew is quite difficult > - But your situation could be worse, all 4 keys could end up in the > same bucket, if the hash function in use happens to generate collisions for > the 4 keys, in which case 2 of your 3 buckets would not process any events > … this could also lead to watermarks not progressing … > - There is two proposal on how to improve the situation: > - Use the same parallelism and max parallelism for the relevant > operators and implement a manual partitioner > - A manual partitioner is also good in situations where you want > to lower the bias and you exactly know the distribution of your key > space > and rearrange keys to even-out numbers > - More sophisticated (if possible), divide-and-conquer like: > - Key by your ‘small’ key plus soma arbitrary attribute with > higher entropy > - Window aggregate first on that artificial key > - Aggregate the results on your original ‘small’ key > - This could be interesting for high-throughput situation where > you actually want to run in parallelism higher than the number of > different > ‘small’ keys > > > > Hope this helps > > > > Thias > > > > > > *From:* Yuval Itzchakov <yuva...@gmail.com> > *Sent:* Mittwoch, 3. November 2021 14:41 > *To:* user <user@flink.apache.org> > *Subject:* Custom partitioning of keys with keyBy > > > > Hi, > > I have a use-case where I'd like to partition a KeyedDataStream a bit > differently than how Flinks default partitioning works with key groups. > > > > What I'd like to be able to do is take all my data and split it up evenly > between 3 buckets which will store the data in the state. Using the key > above works, but splits the data unevenly between the different key groups, > as usually the key space is very small (0 - 3). What ends up happening is > that sometimes 50% of the keys end up on the same operator index, where > ideally I'd like to distribute it evenly between all operator indexes in > the cluster. > > > > Is there any way of doing this? > > -- > > Best Regards, > Yuval Itzchakov. > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >