Hi community,

We have found a serious issue with the newly-introduced
*KafkaSerializationSchemaWrapper
*class, which eventually let *FlinkKafkaProducer *only write to partition 0
in the given Kafka topic under certain conditions.

First let's look at this constructor in the universal version of
*FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom
partitioner.

[image: image.png]

And when we trace down the call path, *KafkaSerializationSchemaWrapper *is
used to wrap the aforementioned custom partitioner, i.e.
*FlinkFiexedPartitioner*.

[image: image.png]

However, we found that in the implementation of
*KafkaSerializationSchemaWrapper*, it does not call the *open *method of
the given partitioner, which is essential for the partitioner to initialize
its environment variables like *parallelInstanceId *in
*FlinkFixedPartitioner*.

Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later called
by the FlinkKafkaProducer,   *FlinkFiexedPartitioner#partition* would
always return 0, because  *parallelInstanceId *is not properly initialized.
[image: image.png]

Eventually, all of the data would go only to partition 0 of the given Kafka
topic, creating severe data skew in the sink.

Reply via email to