Hi community, We have found a serious issue with the newly-introduced *KafkaSerializationSchemaWrapper *class, which eventually let *FlinkKafkaProducer *only write to partition 0 in the given Kafka topic under certain conditions.
First let's look at this constructor in the universal version of *FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom partitioner. [image: image.png] And when we trace down the call path, *KafkaSerializationSchemaWrapper *is used to wrap the aforementioned custom partitioner, i.e. *FlinkFiexedPartitioner*. [image: image.png] However, we found that in the implementation of *KafkaSerializationSchemaWrapper*, it does not call the *open *method of the given partitioner, which is essential for the partitioner to initialize its environment variables like *parallelInstanceId *in *FlinkFixedPartitioner*. Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later called by the FlinkKafkaProducer, *FlinkFiexedPartitioner#partition* would always return 0, because *parallelInstanceId *is not properly initialized. [image: image.png] Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink.