I’ve come up with a couple solutions since we too have a power law 
distribution. However, we have not put anything into practice.

Fixed Slicing

One simple thing to do is to take each key and slice it into some fixed number 
of partitions. So your function might be:

(hash(key) % num) + (hash(key) % 10)

In order to distribute it across 10 partitions. Or:

hash(key + ‘0’) % num
hash(key + ‘1’) % num
…
hash(key + ‘9’) % num


Hyperspace Hashing

If your data is multi-dimensional, then you might find hyperspace hashing 
useful. I’ll give a simple example, but it’s easy to generalize. Suppose that 
you have two dimensions you’d like to partition on: customer id (C) and city 
location (L). You’d like to be able to subscribe to all data for some subset of 
customers, and you’d also like to be able to subscribe to all data for some 
subset of locations. Suppose that this data goes into a topic with 256 
partitions.

For any piece of data, you’d construct the partition it goes to like so:

((hash(C) % 16) << 4) + ((hash(L) % 16)

What that is basically saying is take C and map them to 16 different spots, and 
set it as the high 4 bits of an 8 bit int. Then take the location, map it to 16 
different spots, and set it as the lower 4 bits of the int. The resulting 
number is the partition that piece of data goes to.

Now if you want one particular C, you subscribe to the 16 partitions that 
contain that C. If you want some particular L, you subscribe to the 16 
partitions that contain that L.

You can extend this scheme to an arbitrary number of dimensions subject to the 
number of partitions in the topic, and you can vary the number of bits that any 
particular dimension takes. This scheme suffers from a combinatorial explosion 
of partitions if you really want to query on lots of different dimensions, but 
you can see the Hyperdex paper for clues on how to deal with this.


Unbalanced Hashing

It’s easy to generate ok but not great hash functions. One is DJB hash, which 
relies on two empirically determined constants:

http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
 
<http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function>

(5381 and 33 in the above example)

If you can do offline analysis, and your distribution doesn’t change over time, 
then you can basically exhaustively search for two values that produce a hash 
function that better distributes the load.


Greedy Knapsack

But if you’re ok doing offline analysis and generating your own hash function, 
then you can create one that’s simply a hard coded list of mappings for the 
heaviest keys, and then defaults to a regular hash for the rest. The easiest 
way to programmatically do this is to use a greedy algorithm:

  for each heavy key, k:
    assign k to the partition with the least assigned weight


The advantage to fixed slicing and hyperspace hashing is that you don’t have to 
know your distribution a priori, and it generally scales well as you increase 
the number of keys. The disadvantage is that one key’s data is split across 
multiple partitions.

The advantage to unbalanced hashing and greedy knapsack is that you can get 
close to an optimal partitioning scheme and all of one key resides in one 
partition. The downside is that you need to do partition mapping management as 
your distribution changes over time.

Hopefully that gives you some ideas!

Wes



> On May 3, 2016, at 9:09 AM, Jens Rantil <jens.ran...@tink.se> wrote:
> 
> Hi,
> 
> Not sure if this helps, but the way Loggly seem to do it is to have a
> separate topic for "noisy neighbors". See [1].
> 
> [1]
> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable-messaging-better-log-management/
> 
> Cheers,
> Jens
> 
> On Wed, Apr 27, 2016 at 9:11 PM Srikanth <srikanth...@gmail.com> wrote:
> 
>> Hello,
>> 
>> Is there a recommendation for handling producer side partitioning based on
>> a key with skew?
>> We want to partition on something like clientId. Problem is, this key has
>> an uniform distribution.
>> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
>> 65million/day.
>> Cardinality of key is around 1500 and there are approx 1 billion records
>> per day.
>> Partitioning by hashcode(key)%numOfPartition will create a few "hot
>> partitions" and cause a few brokers(and consumer threads) to be overloaded.
>> May be these partitions with heavy load are evenly distributed among
>> brokers, may be they are not.
>> 
>> I read KIP-22
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+Partitioner+interface+in+the+new+producer
>>> 
>> that
>> explains how one could write a custom partitioner.
>> I'd like to know how it was used to solve such data skew.
>> We can compute some statistics on key distribution offline and use it in
>> the partitioner.
>> Is that a good idea? Or is it way too much logic for a partitioner?
>> Anything else to consider?
>> Any thoughts or reference will be helpful.
>> 
>> Thanks,
>> Srikanth
>> 
> -- 
> 
> Jens Rantil
> Backend Developer @ Tink
> 
> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
> For urgent matters you can reach me at +46-708-84 18 32.

Reply via email to