Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.

For reference, the key group for a given key is

    MathUtils.murmurHash(key.hashCode()) % maxParallelism

and a given key group will be assigned to the slot computed by

    keyGroup * actualParallelism / maxParallelism

David



On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Yuval,
>
>
>
> Just a couple of comments:
>
>
>
>    - Assuming that all your 4 different keys are evenly distributed, and
>    you send them to (only) 3 buckets, you would expect at least one bucket to
>    cover 2 of your keys, hence the 50%
>    - With low entropy keys avoiding data skew is quite difficult
>    - But your situation could be worse, all 4 keys could end up in the
>    same bucket, if the hash function in use happens to generate collisions for
>    the 4 keys, in which case 2 of your 3 buckets would not process any events
>    … this could also lead to watermarks not progressing …
>    - There is two proposal on how to improve the situation:
>       - Use the same parallelism and max parallelism for the relevant
>       operators and implement a manual partitioner
>          - A manual partitioner is also good in situations where you want
>          to lower the bias and you exactly know the distribution of your key 
> space
>          and rearrange keys to even-out numbers
>       - More sophisticated (if possible), divide-and-conquer like:
>          - Key by your ‘small’ key plus soma arbitrary attribute with
>          higher entropy
>          - Window aggregate first on that artificial key
>          - Aggregate the results on your original ‘small’ key
>          - This could be interesting for high-throughput situation where
>          you actually want to run in parallelism higher than the number of 
> different
>          ‘small’ keys
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Yuval Itzchakov <yuva...@gmail.com>
> *Sent:* Mittwoch, 3. November 2021 14:41
> *To:* user <user@flink.apache.org>
> *Subject:* Custom partitioning of keys with keyBy
>
>
>
> Hi,
>
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
>
>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
>
>
> Is there any way of doing this?
>
> --
>
> Best Regards,
> Yuval Itzchakov.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to