Assuming you’re not doing custom partitioning, then another workaround is to 
pass Optional.empty() for the partitioner, so that it will use the Kafka 
partitioning vs. a Flink partitioner.

Or at least that worked for us, when we encountered this same issue.

— Ken

> On Sep 3, 2020, at 5:36 AM, Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> 
> Thank you for the thorough investigation.  I totally agree with you. I 
> created an issue for it[1]. Will try to fix it as soon as possible and 
> include it in 1.11.2 and 1.12. 
> The way you could work this around is by using the KafkaSerializationSchema 
> directly with an KafkaContextAware interface.
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19133 
> <https://issues.apache.org/jira/browse/FLINK-19133>
> On 03/09/2020 14:24, DONG, Weike wrote:
>> Hi community,
>> 
>> And by the way, during FlinkKafkaProducer#initProducer, the 
>> flinkKafkaPartitioner is only opened when is is NOT null, which is 
>> unfortunately not the case here, because it would be set to null if 
>> KafkaSerializationSchemaWrapper is provided in the arguments of the 
>> constructor.
>> 
>> <image.png>
>> <image.png>
>> 
>> So these logic flaws eventually lead to this serious bug, and we recommend 
>> that initialization of FlinkKafkaPartitioners could be done in 
>> KafkaSerializationSchemaWrapper#open.
>> 
>> Sincerely,
>> Weike
>> 
>> 
>> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyled...@connect.hku.hk 
>> <mailto:kyled...@connect.hku.hk>> wrote:
>> Hi community,
>> 
>> We have found a serious issue with the newly-introduced 
>> KafkaSerializationSchemaWrapper class, which eventually let 
>> FlinkKafkaProducer only write to partition 0 in the given Kafka topic under 
>> certain conditions.
>> 
>> First let's look at this constructor in the universal version of 
>> FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom 
>> partitioner.
>> 
>> <image.png>
>> 
>> And when we trace down the call path, KafkaSerializationSchemaWrapper is 
>> used to wrap the aforementioned custom partitioner, i.e. 
>> FlinkFiexedPartitioner. 
>> 
>> <image.png>
>> 
>> However, we found that in the implementation of  
>> KafkaSerializationSchemaWrapper, it does not call the open method of the 
>> given partitioner, which is essential for the partitioner to initialize its 
>> environment variables like parallelInstanceId in FlinkFixedPartitioner. 
>> 
>> Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by 
>> the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always 
>> return 0, because  parallelInstanceId is not properly initialized.
>> <image.png>
>> 
>> Eventually, all of the data would go only to partition 0 of the given Kafka 
>> topic, creating severe data skew in the sink.
>> 
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to