Could you expand a bit more on what you want to achieve?
(In particular /where/ you want to use this partitioner; as an operation before a sink
or within a kafka sink)

On 24.10.2017 09:24, kla wrote:
Hey,

I would like to use a round-robin kafka partitioner in the apache flink.
(the default one)

I forked the Kafka's code from the DefaultPartitioner class.

public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {

     private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());

     @Override
     public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {

         if (serializedKey == null) {
             int nextValue = counter.getAndIncrement();
             // key is null choose randomly
             return toPositive(nextValue) % numPartitions;
         } else {
             // hash the keyBytes to choose a partition
             return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
         }
     }

     private static int toPositive(int number) {
         return number & 0x7fffffff;

     }
}

Is it a better way to do it ?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to