Yes, that's correct. For reference you can just take a look at the
DefaultPartitioner which does nearly same (with additional logic to do
round robin when there isn't a key): https://github.com/
apache/kafka/blob/trunk/clients/src/main/java/org/
apache/kafka/clients/producer/internals/DefaultPartitioner.java#L67

The Cluster object contains cached metadata about the cluster so the cost
of this lookup is minimal.

-Ewen

On Tue, Nov 22, 2016 at 5:46 AM, Marina <ppi...@yahoo.com.invalid> wrote:

> Hi,I'm trying to upgrade my 0.8 producer to 0.9(0.10) APIs, and noticed
> that the way to implement a custom Partitioner has changed....
> In 0.8, I had implemented this interface:kafka.producer.Partitioner
> with this implementation of the partition() method - where the goal is to
> equally distribute events by their custom IDs over all available
> partitions:    @Override
>     public int partition(Object myIdAsObject, int numberOfPartitions) {
>         String myId = (String) myIdAsObject;
>         int partitionNumber = (myId.hashCode() & 0x7fffffff) %
> numberOfPartitions;
>         return partitionNumber;
>     }
>
> In 0.9, I found this Interface:org.apache.kafka.cli
> ents.producer.Partitioner
> and the method to partition has different arguments. Here is my attempt to
> replicate the same partitioning logic:
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes,
>             Object value, byte[] valueBytes, Cluster cluster) {
>         int numberOfPartitions = cluster.partitionCountForTopic(topic);
>         String myId = (String) key;
>         int partitionNumber = (myId.hashCode() & 0x7fffffff) %
> numberOfPartitions;
>         return partitionNumber;
>     }
>
> Questions:-- is this the right way to find the current number of
> partitions for the topic the producer is configured for?-- if yes - what is
> the performance impact of querying Cluster for the number of partitions on
> *each* event processing?-- what are 'value' and 'valueBytes' objects - the
> actual message?-- assuming I am sending events as:
>        ProducerRecord<byte[], byte[]> record =
>                 new ProducerRecord<byte[], byte[]>(destinationTopic,
> myId.getBytes(), eventBody.getBytes());
>         producer.send(record);
> will the correct values passed to the custom Partitioner's partition()
> method? (the myId and the event body)
>
> If you have references to examples of implementing custom Partitioners -
> would be great.The only one I could find was this:
> http://howtoprogram.xyz/2016/06/04/write-apache-kafka-custom-partitioner/
> but it is too simplistic, as it does not really use the info about the
> number of partitions.
> Thanks!Marina
>
>


-- 
Thanks,
Ewen

Reply via email to