Thank you Schwalbe, David and Naitong for your answers!

*David*: This is what we're currently doing ATM, and I wanted to know if
there's any simplified approach to this. This is what we have so far:
https://gist.github.com/YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b
*Naitong*: The keyBy internally will rehash the key you provide it. How do
you make sure that the re-hashed key is still in the desired key group
range?
*Schwalbe*:

   - Assuming that all your 4 different keys are evenly distributed, and
   you send them to (only) 3 buckets, you would expect at least one bucket to
   cover 2 of your keys, hence the 50% - You're right, this is the desire
   behavior I actually want, I don't want them to be really uniformly
   distributed as I want to batch multiple keys together in the same bucket.
   - With low entropy keys avoiding data skew is quite difficult - I
   understand, and we are well aware of the implications.
   - But your situation could be worse, all 4 keys could end up in the same
   bucket, if the hash function in use happens to generate collisions for the
   4 keys, in which case 2 of your 3 buckets would not process any events …
   this could also lead to watermarks not progressing … - We take care of
   this internally as we understand there may be skewing to the buckets. I
   don't care about watermarks at this stage.
   - There is two proposal on how to improve the situation:
      - Use the same parallelism and max parallelism for the relevant
      operators and implement a manual partitioner
         - A manual partitioner is also good in situations where you want
         to lower the bias and you exactly know the distribution of
your key space
         and rearrange keys to even-out numbers - I looked into custom
         partitioning, but it seems to not work with KeyedDataStream,
and I need the
         distribution to be performed when keying the stream.
      - More sophisticated (if possible), divide-and-conquer like -
      Interesting idea, but I'm not sure I follow. Could you possibly provide a
      sketch of the transformations on the stream?
         - Key by your ‘small’ key plus soma arbitrary attribute with
         higher entropy
         - Window aggregate first on that artificial key
         - Aggregate the results on your original ‘small’ key
         - This could be interesting for high-throughput situation where
         you actually want to run in parallelism higher than the
number of different
         ‘small’ keys



On Thu, Nov 4, 2021 at 5:48 AM naitong Xiao <xiaonait...@gmail.com> wrote:

> I think I had a similar scenario several months ago, here is my related
> code:
>
> val MAX_PARALLELISM = 16
> val KEY_RAND_SALT = “73b46”
>
> logSource.keyBy{ value =>
>  val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid,
> MAX_PARALLELISM)
>  s"$KEY_RAND_SALT$keyGroup"
> }
>
> The keyGroup is just like your bucket id,  and the KEY_RAND_SALT was
> generated by some script to map bucket id evenly to operators under the max
> parallelism.
>
> Sent with a Spark <https://sparkmailapp.com/source?from=signature>
> On Nov 3, 2021, 9:47 PM +0800, Yuval Itzchakov <yuva...@gmail.com>, wrote:
>
> Hi,
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
> <image.png>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
> Is there any way of doing this?
> --
> Best Regards,
> Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to