I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use it as following:
KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(sinkServers).setKafkaProducerConfig(props) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()) .setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build(); But when the partition number is changed(becomes larger), no data is written to the new partitions. I looked at the source code, it seems because the KafkaSinkContext can not retrieve the partitions dynamically. Is there any way to fix this? Thanks, Lei