Assuming you’re not doing custom partitioning, then another workaround is to pass Optional.empty() for the partitioner, so that it will use the Kafka partitioning vs. a Flink partitioner.
Or at least that worked for us, when we encountered this same issue. — Ken > On Sep 3, 2020, at 5:36 AM, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > Thank you for the thorough investigation. I totally agree with you. I > created an issue for it[1]. Will try to fix it as soon as possible and > include it in 1.11.2 and 1.12. > The way you could work this around is by using the KafkaSerializationSchema > directly with an KafkaContextAware interface. > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-19133 > <https://issues.apache.org/jira/browse/FLINK-19133> > On 03/09/2020 14:24, DONG, Weike wrote: >> Hi community, >> >> And by the way, during FlinkKafkaProducer#initProducer, the >> flinkKafkaPartitioner is only opened when is is NOT null, which is >> unfortunately not the case here, because it would be set to null if >> KafkaSerializationSchemaWrapper is provided in the arguments of the >> constructor. >> >> <image.png> >> <image.png> >> >> So these logic flaws eventually lead to this serious bug, and we recommend >> that initialization of FlinkKafkaPartitioners could be done in >> KafkaSerializationSchemaWrapper#open. >> >> Sincerely, >> Weike >> >> >> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyled...@connect.hku.hk >> <mailto:kyled...@connect.hku.hk>> wrote: >> Hi community, >> >> We have found a serious issue with the newly-introduced >> KafkaSerializationSchemaWrapper class, which eventually let >> FlinkKafkaProducer only write to partition 0 in the given Kafka topic under >> certain conditions. >> >> First let's look at this constructor in the universal version of >> FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom >> partitioner. >> >> <image.png> >> >> And when we trace down the call path, KafkaSerializationSchemaWrapper is >> used to wrap the aforementioned custom partitioner, i.e. >> FlinkFiexedPartitioner. >> >> <image.png> >> >> However, we found that in the implementation of >> KafkaSerializationSchemaWrapper, it does not call the open method of the >> given partitioner, which is essential for the partitioner to initialize its >> environment variables like parallelInstanceId in FlinkFixedPartitioner. >> >> Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by >> the FlinkKafkaProducer, FlinkFiexedPartitioner#partition would always >> return 0, because parallelInstanceId is not properly initialized. >> <image.png> >> >> Eventually, all of the data would go only to partition 0 of the given Kafka >> topic, creating severe data skew in the sink. >> >> >> -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr