Hi community, And by the way, during *FlinkKafkaProducer#initProducer*, the *flinkKafkaPartitioner* is only opened when is is NOT null, which is unfortunately not the case here, because it would be set to null if *KafkaSerializationSchemaWrapper *is provided in the arguments of the constructor.
[image: image.png] [image: image.png] So these logic flaws eventually lead to this serious bug, and we recommend that initialization of FlinkKafkaPartitioners could be done in KafkaSerializationSchemaWrapper#open. Sincerely, Weike On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyled...@connect.hku.hk> wrote: > Hi community, > > We have found a serious issue with the newly-introduced > *KafkaSerializationSchemaWrapper > *class, which eventually let *FlinkKafkaProducer *only write to partition > 0 in the given Kafka topic under certain conditions. > > First let's look at this constructor in the universal version of > *FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom > partitioner. > > [image: image.png] > > And when we trace down the call path, *KafkaSerializationSchemaWrapper *is > used to wrap the aforementioned custom partitioner, i.e. > *FlinkFiexedPartitioner*. > > [image: image.png] > > However, we found that in the implementation of > *KafkaSerializationSchemaWrapper*, it does not call the *open *method of > the given partitioner, which is essential for the partitioner to initialize > its environment variables like *parallelInstanceId *in > *FlinkFixedPartitioner*. > > Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later > called by the FlinkKafkaProducer, *FlinkFiexedPartitioner#partition* > would always return 0, because *parallelInstanceId *is not properly > initialized. > [image: image.png] > > Eventually, all of the data would go only to partition 0 of the given > Kafka topic, creating severe data skew in the sink. > > > >