Hi,

I was playing with the new producer in 0.8.2.1 using partition keys ("semantic 
partitioning" I believe is the phrase?). I noticed that the default partitioner 
in 0.8.2.1 does not partition items the same way as the old 0.8.1.1 default 
partitioner was doing. For a test item, the old producer was sending it to 
partition 0, whereas the new producer was sending it to partition 4.

Digging in the code, it appears that the partitioning logic is different 
between the old and new producers. Both of them hash the key, but they use 
different hashing algorithms.

Old partitioner:
./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:

  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }

New partitioner:
./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:

        } else {
            // hash the key to choose a partition
            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
        }

Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2 isn't 
the same logic as hashCode, especially since hashCode is overrideable).

Was it intentional that the hashing algorithm would change between the old and 
new producer? If so, was this documented? I don't know if anyone was relying on 
the old default partitioner, as opposed to going round-robin or using their own 
custom partitioner. Do you expect it to change in the future? I'm guessing that 
one of the main reasons to have a custom hashing algorithm is so that you are 
full control of the partitioning and can keep it stable (as opposed to being 
reliant on hashCode()).

Thanks,
-James

Reply via email to