Hi Yuval,
… I had to do some guesswork with regard to your use case … still not exactly
clear what you want to achieve, however I remember having done something
similar in that area 2 years ago.
Unfortunately I cannot find the implementation anymore ☹
* If you tried a combination of .partitionCustom() and
reinterpretAsKeyedStream(): this will fail, because reinterpretAsKeyedStream()
forces a ForwardPartitioner.
* You could still model your code after the implementation of
reinterpretAsKeyedStream and use your own partitioner instead [1]
* Partitioning is relevant in two places:
* The outgoing Transform for selection of the output channel
* The incoming Transform for selecting the correct key range for state
primitives
* You need to make sure that both sides agree
… for the last question regarding the more sophisticated scenario … please give
me a little more time for a sketch … I also want to understand a little better
your use case
Hope this helps
Thias
[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L185-L210
From: Yuval Itzchakov <[email protected]>
Sent: Donnerstag, 4. November 2021 08:25
To: naitong Xiao <[email protected]>
Cc: user <[email protected]>
Subject: Re: Custom partitioning of keys with keyBy
Thank you Schwalbe, David and Naitong for your answers!
David: This is what we're currently doing ATM, and I wanted to know if there's
any simplified approach to this. This is what we have so far:
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
Naitong: The keyBy internally will rehash the key you provide it. How do you
make sure that the re-hashed key is still in the desired key group range?
Schwalbe:
* Assuming that all your 4 different keys are evenly distributed, and you
send them to (only) 3 buckets, you would expect at least one bucket to cover 2
of your keys, hence the 50% - You're right, this is the desire behavior I
actually want, I don't want them to be really uniformly distributed as I want
to batch multiple keys together in the same bucket.
* With low entropy keys avoiding data skew is quite difficult - I
understand, and we are well aware of the implications.
* But your situation could be worse, all 4 keys could end up in the same
bucket, if the hash function in use happens to generate collisions for the 4
keys, in which case 2 of your 3 buckets would not process any events … this
could also lead to watermarks not progressing … - We take care of this
internally as we understand there may be skewing to the buckets. I don't care
about watermarks at this stage.
* There is two proposal on how to improve the situation:
* Use the same parallelism and max parallelism for the relevant
operators and implement a manual partitioner
* A manual partitioner is also good in situations where you want to
lower the bias and you exactly know the distribution of your key space and
rearrange keys to even-out numbers - I looked into custom partitioning, but it
seems to not work with KeyedDataStream, and I need the distribution to be
performed when keying the stream.
* More sophisticated (if possible), divide-and-conquer like -
Interesting idea, but I'm not sure I follow. Could you possibly provide a
sketch of the transformations on the stream?
* Key by your ‘small’ key plus soma arbitrary attribute with higher
entropy
* Window aggregate first on that artificial key
* Aggregate the results on your original ‘small’ key
* This could be interesting for high-throughput situation where you
actually want to run in parallelism higher than the number of different ‘small’
keys
On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao
<[email protected]<mailto:[email protected]>> wrote:
I think I had a similar scenario several months ago, here is my related code:
val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”
logSource.keyBy{ value =>
val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
MAX_PARALLELISM)
s"$KEY_RAND_SALT$keyGroup"
}
The keyGroup is just like your bucket id, and the KEY_RAND_SALT was generated
by some script to map bucket id evenly to operators under the max parallelism.
Sent with a Spark<https://sparkmailapp.com/source?from=signature>
On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov
<[email protected]<mailto:[email protected]>>, wrote:
Hi,
I have a use-case where I'd like to partition a KeyedDataStream a bit
differently than how Flinks default partitioning works with key groups.
<image.png>
What I'd like to be able to do is take all my data and split it up evenly
between 3 buckets which will store the data in the state. Using the key above
works, but splits the data unevenly between the different key groups, as
usually the key space is very small (0 - 3). What ends up happening is that
sometimes 50% of the keys end up on the same operator index, where ideally I'd
like to distribute it evenly between all operator indexes in the cluster.
Is there any way of doing this?
--
Best Regards,
Yuval Itzchakov.
--
Best Regards,
Yuval Itzchakov.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.