Hi!
you will have to modify your partitioner to implement the
FlinkKafkaPartitioner
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java>interface
instead.
You can then plug this into any kafka sink through on of the constructors.
Regards,
Chesnay
On 24.10.2017 22:15, kla wrote:
Hi Chesnay,
Thanks for your reply.
I would like to use the partitioner within the Kafka Sink operation.
By default kafka sink is using FixedPartitioner:
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
}
So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:
* Not all Kafka partitions contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
* cause a lot of network connections between all the Flink instances and
all the Kafka brokers
According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?
Thanks again.
--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/