Hi Chesnay, Thanks for your reply.
I would like to use the partitioner within the Kafka Sink operation. By default kafka sink is using FixedPartitioner: public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); } So I have 12 kafka topic partitions and I have 2 Flink partitions, and I have unbalanced partitioning. According to the java doc in the FixedPartitioner class which is following: * Not all Kafka partitions contain data * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers According to the this I have to use a round-robin kafka partitioner. And what is the right way to do it ? Thanks again. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/