Seems i needn't to define a FlinkRoundRoubinPartitioner and just use the RoundRobinPartitioner suppllied in kafka: props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
In this way the new partitions will be found dynamically. Howerever, there's a bug in RoundRobinPartitioner and the distribution is still uneven https://issues.apache.org/jira/browse/KAFKA-9965 On Tue, Jul 30, 2024 at 4:20 PM Lei Wang <leiwang...@gmail.com> wrote: > I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use > it as following: > > KafkaSink kafkaSink = KafkaSink.builder() > .setBootstrapServers(sinkServers).setKafkaProducerConfig(props) > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()) > .setPartitioner(new FlinkRoundRobinPartitioner<>()).build()).build(); > > But when the partition number is changed(becomes larger), no data is > written to the new partitions. > I looked at the source code, it seems because the KafkaSinkContext can not > retrieve the partitions dynamically. > Is there any way to fix this? > > Thanks, > Lei >