Hey, 

I would like to use a round-robin kafka partitioner in the apache flink. 
(the default one)

I forked the Kafka's code from the DefaultPartitioner class.

public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {

    private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());

    @Override
    public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {

        if (serializedKey == null) {
            int nextValue = counter.getAndIncrement();
            // key is null choose randomly
            return toPositive(nextValue) % numPartitions;
        } else {
            // hash the keyBytes to choose a partition
            return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
        }
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;

    }
}

Is it a better way to do it ?

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to