Thanks for your comment.
If I write the KafkaPartitioner anyway I have to somehow pass the
*kafka.producer.Partitioner* which is not so easy.
So I have found the easiest solution for this, you have just pass /null/
value:
outputStream.addSink(new
FlinkKafkaProducer010<>(producerProperties.getProp
So you want to use the kafka partitioner directly?
How about an adapter?
public class KafkaPartitionerWrapper extends KafkaPartitioner implements
Serializable {
private final kafka.producer.Partitionerpartitioner; public
KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) {
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.
But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)
Thanks,
Konstantin
--
Sent from: http://apache-
lanced partitioning.
According to the java doc in the FixedPartitioner class which is following:
* Not all Kafka partitions contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
* cause a lot of network connections between a
contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
* cause a lot of network connections between all the Flink instances and
all the Kafka brokers
According to the this I have to use a round-robin kafka partitioner. And
what is the
Could you expand a bit more on what you want to achieve?
(In particular /where/ you want to use this partitioner; as an operation
before a sink
or within a kafka sink)
On 24.10.2017 09:24, kla wrote:
Hey,
I would like to use a round-robin kafka partitioner in the apache flink.
(the default
Hey,
I would like to use a round-robin kafka partitioner in the apache flink.
(the default one)
I forked the Kafka's code from the DefaultPartitioner class.
public class HashPartitioner extends KafkaPartitioner implements
Serializable {
private final AtomicInteger counter