Could you expand a bit more on what you want to achieve?
(In particular /where/ you want to use this partitioner; as an operation
before a sink
or within a kafka sink)
On 24.10.2017 09:24, kla wrote:
Hey,
I would like to use a round-robin kafka partitioner in the apache flink.
(the default one)
I forked the Kafka's code from the DefaultPartitioner class.
public class HashPartitioner<T> extends KafkaPartitioner<T> implements
Serializable {
private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());
@Override
public int partition(T next, byte[] serializedKey, byte[]
serializedValue, int numPartitions) {
if (serializedKey == null) {
int nextValue = counter.getAndIncrement();
// key is null choose randomly
return toPositive(nextValue) % numPartitions;
} else {
// hash the keyBytes to choose a partition
return toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
}
private static int toPositive(int number) {
return number & 0x7fffffff;
}
}
Is it a better way to do it ?
Thanks,
Konstantin
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/