Hi Shashank, kafka-node is not developed by Apache or the Apache Kafka project. I don't think anyone here has looked at that code.
I have heard reports from the field that kafka-node has some very serious bugs, such as not retrying failed requests at all in some circumstances. I do not recommend using this client. https://github.com/Blizzard/node-rdkafka is a better choice. Keep in mind that node-rdkafka is still not part of the Apache Kafka project, so you might want to discuss any issues with that particular project. best, Colin On Tue, Oct 23, 2018, at 22:04, Shashank Sah wrote: > When a broker host goes down/restarts, the kafka client keeps sending the > new requests to the same hosts address. As a result, the requests are > failing with this error: Request timed out after 30000ms. > > Node version: v6.8.1 > Kafka-node version: 3.0.1 > Kafka version: 2.11-2.0.0 > Number of Brokers: 3 > Number partitions for topic: 10 > > Some code pointers: > > "clusterConfig" : { > "kafkaHost": "localhost:9092,localhost:9093,localhost:9094", > "autoConnect": true > } > ... > let kafkaClient = new kafka.KafkaClient(clusterConfig); > producer = new kafka.HighLevelProducer(kafkaClient, > cluster.producerConfig); > Promise.promisifyAll(producer); > ... > producer.sendAsync([eventPayload]) > .then(function (data) { > let topicName = eventPayload.topic; > let payLoadSize = (eventPayload || '').length; > logger.eventInfo(topicName, payLoadSize, source); > }) > .catch(function (e) { > logger.produceFailedEvent(eventPayload, source); > throw Error.getErrorObject(errorType, e, topic, source); > }); > > I have the kept the other configurations to default. > It seems there is some issue with kafka-node library, given below are the > logs with the corresponding hosts they are going to connect. The first call > was successful, second failed and third was successful. In the below case > localhost:9092 was down. > > kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s > kafka-node:KafkaClient compressing messages if needed +76ms > shashanksah actual broker!!!!![BrokerWrapper localhost:9094 (connected: > true) (ready: true) (idle: false) (needAuthentication: false) > (authenticated: false)] > kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 > +928ms > kafka-node:KafkaClient compressing messages if needed +422ms > shashanksah actual broker!!!!![BrokerWrapper localhost:9092 (connected: > false) (ready: false) (idle: false) (needAuthentication: false) > (authenticated: false)] > kafka-node:KafkaClient missing apiSupport waiting until broker is ready... > +1ms > kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 > +583ms > kafka-node:KafkaClient compressing messages if needed +280ms > shashanksah actual broker!!!!![BrokerWrapper localhost:9093 (connected: > true) (ready: true) (idle: false) (needAuthentication: false) > (authenticated: false)] > kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 > +723ms > > The issue is that it gets the ip of host which is down and then waits on > that host to be ready (for 30secs) and then fails the sent request. If we > change the function ensureBrokerReady in this file: > https://github.com/SOHU-Co/kafka-node/blob/master/lib/kafkaClient.js#L1016 > to something like given below, then this issue does not occurs: > > const ensureBrokerReady = async.ensureAsync((leader, callback) => { > let broker = this.brokerForLeader(leader, longpolling); > console.log("shashanksah actual broker!!!!!" + broker); > if (!broker.isReady()) { > this.refreshBrokerMetadata(); > broker = this.brokerForLeader(leader, longpolling); > //console.log("shashanksah broker not ready so refresh and retry!!!!!"); > } > if (!broker.isReady()) { > //console.log("shashanksah !broker.isReady"); > logger.debug('missing apiSupport waiting until broker is ready...'); > this.waitUntilReady(broker, callback); > } else { > callback(null); > } > }); > > Please tell if I am missing anything or the RCA is correct. > > Thanks, > > Shashank