Hello Ismael, yes, I went through that PR and this is what I was actually looking for. Thanks!
2016-10-12 14:37 GMT+02:00 Ismael Juma <ism...@juma.me.uk>: > Hi Alexandru, > > I think your issue will be fixed by KAFKA-4254. There's a PR available and > should be merged shortly. Can you please verify? > > Thanks, > Ismael > > On Wed, Oct 12, 2016 at 11:00 AM, Alexandru Ionita < > alexandru.ion...@gmail.com> wrote: > > > OK. then my question is: why is not the producer trying to recover from > > this error by updating its topic metadata right away instead of waiting > for > > the "metadata.max.age.ms" to expire? > > > > 2016-10-12 11:43 GMT+02:00 Manikumar <manikumar.re...@gmail.com>: > > > > > we have similar setting "metadata.max.age.ms" in new producer api. > > > Its default value is 300sec. > > > > > > On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita < > > > alexandru.ion...@gmail.com> wrote: > > > > > > > Hello kafka users!! > > > > > > > > I'm trying implement/use a mechanism to make a Kafka producer > > > imperatively > > > > update its topic metadata for a particular topic. > > > > > > > > Here is the use case: > > > > > > > > we are adding partitions on topics programmatically because we want > to > > > very > > > > strictly control how messages are published to particular partitions. > > > > > > > > We are using AdminUtils.addPartitions to achieve this. > > > > We then store the ID of the newly added partition in Zookeeper so > that > > we > > > > persist a mapping to a partition ID for our particular domain key. > > > > > > > > The problem we are facing right now is that the Kafka producer won't > > > > refresh its topic metadata until after a while, preventing the > producer > > > > from posting to those partitions by throwing an error : > > > > > > > > Caused by: java.lang.IllegalArgumentException: Invalid partition > given > > > > with > > > > record: 56 is not in the range [0...55]. > > > > at > > > > org.apache.kafka.clients.producer.KafkaProducer. > > > > partition(KafkaProducer.java:717) > > > > ~[kafka-clients-0.10.0.1.jar:na] > > > > at > > > > org.apache.kafka.clients.producer.KafkaProducer.doSend( > > > > KafkaProducer.java:459) > > > > ~[kafka-clients-0.10.0.1.jar:na] > > > > at > > > > org.apache.kafka.clients.producer.KafkaProducer.send( > > > > KafkaProducer.java:430) > > > > ~[kafka-clients-0.10.0.1.jar:na] > > > > at > > > > org.apache.kafka.clients.producer.KafkaProducer.send( > > > > KafkaProducer.java:353) > > > > ~[kafka-clients-0.10.0.1.jar:na] > > > > > > > > As I somewhere read (https://github.com/SOHU-Co/ > kafka-node/issues/175 > > ), > > > > the > > > > producer should try to recover from such error by pulling the latest > > > > version of the topic metadata. > > > > > > > > This doesn't happening and I will keep getting those errors for like > 60 > > > > seconds until the producer eventually will be able to publish to that > > > > partition. > > > > > > > > In the previous version of kafka (0.8) there was a producer setting > > > called > > > > topic.metadata.refresh.interval.ms that was aimed to make the > producer > > > > pull > > > > that information. This is what I found related to that setting in the > > 0.8 > > > > documentation: "The producer generally refreshes the topic metadata > > from > > > > brokers when there is a failure (partition missing, leader not > > > > available...) > > > > " > > > > > > > > Any ideas and comments on this are much appreciated. > > > > Thanks > > > > > > > > > >