Hi community,

And by the way, during *FlinkKafkaProducer#initProducer*, the
*flinkKafkaPartitioner* is only opened when is is NOT null, which is
unfortunately not the case here, because it would be set to null if
*KafkaSerializationSchemaWrapper
*is provided in the arguments of the constructor.

[image: image.png]
[image: image.png]

So these logic flaws eventually lead to this serious bug, and we recommend
that initialization of FlinkKafkaPartitioners could be done in
KafkaSerializationSchemaWrapper#open.

Sincerely,
Weike


On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyled...@connect.hku.hk> wrote:

> Hi community,
>
> We have found a serious issue with the newly-introduced 
> *KafkaSerializationSchemaWrapper
> *class, which eventually let *FlinkKafkaProducer *only write to partition
> 0 in the given Kafka topic under certain conditions.
>
> First let's look at this constructor in the universal version of
> *FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom
> partitioner.
>
> [image: image.png]
>
> And when we trace down the call path, *KafkaSerializationSchemaWrapper *is
> used to wrap the aforementioned custom partitioner, i.e.
> *FlinkFiexedPartitioner*.
>
> [image: image.png]
>
> However, we found that in the implementation of
> *KafkaSerializationSchemaWrapper*, it does not call the *open *method of
> the given partitioner, which is essential for the partitioner to initialize
> its environment variables like *parallelInstanceId *in
> *FlinkFixedPartitioner*.
>
> Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later
> called by the FlinkKafkaProducer,   *FlinkFiexedPartitioner#partition*
> would always return 0, because  *parallelInstanceId *is not properly
> initialized.
> [image: image.png]
>
> Eventually, all of the data would go only to partition 0 of the given
> Kafka topic, creating severe data skew in the sink.
>
>
>
>

Reply via email to