Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.
For reference, the key group for a given key is
MathUtils.murmurHash(key.hashCode()) % maxParallelism
and a given key group will be assigned to the slot computed by
keyGroup * actualParallelism / maxParallelism
David
On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias <
[email protected]> wrote:
> Hi Yuval,
>
>
>
> Just a couple of comments:
>
>
>
> - Assuming that all your 4 different keys are evenly distributed, and
> you send them to (only) 3 buckets, you would expect at least one bucket to
> cover 2 of your keys, hence the 50%
> - With low entropy keys avoiding data skew is quite difficult
> - But your situation could be worse, all 4 keys could end up in the
> same bucket, if the hash function in use happens to generate collisions for
> the 4 keys, in which case 2 of your 3 buckets would not process any events
> … this could also lead to watermarks not progressing …
> - There is two proposal on how to improve the situation:
> - Use the same parallelism and max parallelism for the relevant
> operators and implement a manual partitioner
> - A manual partitioner is also good in situations where you want
> to lower the bias and you exactly know the distribution of your key
> space
> and rearrange keys to even-out numbers
> - More sophisticated (if possible), divide-and-conquer like:
> - Key by your ‘small’ key plus soma arbitrary attribute with
> higher entropy
> - Window aggregate first on that artificial key
> - Aggregate the results on your original ‘small’ key
> - This could be interesting for high-throughput situation where
> you actually want to run in parallelism higher than the number of
> different
> ‘small’ keys
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Yuval Itzchakov <[email protected]>
> *Sent:* Mittwoch, 3. November 2021 14:41
> *To:* user <[email protected]>
> *Subject:* Custom partitioning of keys with keyBy
>
>
>
> Hi,
>
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
>
>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
>
>
> Is there any way of doing this?
>
> --
>
> Best Regards,
> Yuval Itzchakov.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>