The keyBy argument function is a deterministic function under same
MaxParallelism, which make sure the key group is always same for same key.
My goal here is making keys distributed evenly among operators even with
different parallelism. I implement the mapping in a different way by exhausting
Itzchakov
Sent: Donnerstag, 4. November 2021 08:25
To: naitong Xiao
Cc: user
Subject: Re: Custom partitioning of keys with keyBy
Thank you Schwalbe, David and Naitong for your answers!
David: This is what we're currently doing ATM, and I wanted to know if there's
any simplified approach to
Thank you Schwalbe, David and Naitong for your answers!
*David*: This is what we're currently doing ATM, and I wanted to know if
there's any simplified approach to this. This is what we have so far:
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
*Naitong*: The keyBy intern
I think I had a similar scenario several months ago, here is my related code:
val MAX_PARALLELISM = 16
val KEY_RAND_SALT = “73b46”
logSource.keyBy{ value =>
val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
MAX_PARALLELISM)
s"$KEY_RAND_SALT$keyGroup"
}
The keyGroup is
Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.
Hi Yuval,
Just a couple of comments:
* Assuming that all your 4 different keys are evenly distributed, and you
send them to (only) 3 buckets, you would expect at least one bucket to cover 2
of your keys, hence the 50%
* With low entropy keys avoiding data skew is quite difficult
*