Hi!

you will have to modify your partitioner to implement the FlinkKafkaPartitioner <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java>interface instead.
You can then plug this into any kafka sink through on of the constructors.

Regards,
Chesnay

On 24.10.2017 22:15, kla wrote:
Hi Chesnay,

Thanks for your reply.

I would like to use the partitioner within the Kafka Sink operation.

By default kafka sink is using FixedPartitioner:

        public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T>
serializationSchema, Properties producerConfig) {
                this(topicId, serializationSchema, producerConfig, new
FixedPartitioner<T>());
        }

So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:

  *  Not all Kafka partitions contain data
  *  To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
  *  cause a lot of network connections between all the Flink instances and
all the Kafka brokers

According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?

Thanks again.



--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to