Thanks for your comment. If I write the KafkaPartitioner anyway I have to somehow pass the *kafka.producer.Partitioner* which is not so easy.
So I have found the easiest solution for this, you have just pass /null/ value: outputStream.addSink(new FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC), new EventSerializationSchema(), producerProperties, null)); Which means that *FlinkKafkaProducer* will automatically use the Kafka's default partitioner. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/