Seems i needn't to define a FlinkRoundRoubinPartitioner and just use the
RoundRobinPartitioner suppllied in kafka:
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,
RoundRobinPartitioner.class.getName());
In this way the new partitions will be found dynamically. Howerever,
there's a bug i
I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use
it as following:
KafkaSink kafkaSink = KafkaSink.builder()
.setBootstrapServers(sinkServers).setKafkaProducerConfig(props)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationS