Hello kafka users!! I'm trying implement/use a mechanism to make a Kafka producer imperatively update its topic metadata for a particular topic.
Here is the use case: we are adding partitions on topics programmatically because we want to very strictly control how messages are published to particular partitions. We are using AdminUtils.addPartitions to achieve this. We then store the ID of the newly added partition in Zookeeper so that we persist a mapping to a partition ID for our particular domain key. The problem we are facing right now is that the Kafka producer won't refresh its topic metadata until after a while, preventing the producer from posting to those partitions by throwing an error : Caused by: java.lang.IllegalArgumentException: Invalid partition given with record: 56 is not in the range [0...55]. at org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:717) ~[kafka-clients-0.10.0.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:459) ~[kafka-clients-0.10.0.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[kafka-clients-0.10.0.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[kafka-clients-0.10.0.1.jar:na] As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175), the producer should try to recover from such error by pulling the latest version of the topic metadata. This doesn't happening and I will keep getting those errors for like 60 seconds until the producer eventually will be able to publish to that partition. In the previous version of kafka (0.8) there was a producer setting called topic.metadata.refresh.interval.ms that was aimed to make the producer pull that information. This is what I found related to that setting in the 0.8 documentation: "The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...) " Any ideas and comments on this are much appreciated. Thanks