When a broker host goes down/restarts, the kafka client keeps sending the
new requests to the same hosts address. As a result, the requests are
failing with this error: Request timed out after 30000ms.

Node version: v6.8.1
Kafka-node version: 3.0.1
Kafka version: 2.11-2.0.0
Number of Brokers: 3
Number partitions for topic: 10

Some code pointers:

    "clusterConfig" : {
      "kafkaHost": "localhost:9092,localhost:9093,localhost:9094",
      "autoConnect": true
    }
    ...
    let kafkaClient = new kafka.KafkaClient(clusterConfig);
    producer = new kafka.HighLevelProducer(kafkaClient,
cluster.producerConfig);
    Promise.promisifyAll(producer);
    ...
    producer.sendAsync([eventPayload])
      .then(function (data) {
        let topicName = eventPayload.topic;
        let payLoadSize = (eventPayload || '').length;
        logger.eventInfo(topicName, payLoadSize, source);
      })
      .catch(function (e) {
        logger.produceFailedEvent(eventPayload, source);
        throw Error.getErrorObject(errorType, e, topic, source);
      });

I have the kept the other configurations to default.
It seems there is some issue with kafka-node library, given below are the
logs with the corresponding hosts they are going to connect. The first call
was successful, second failed and third was successful. In the below case
localhost:9092 was down.

kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
kafka-node:KafkaClient compressing messages if needed +76ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9094 (connected:
true) (ready: true) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+928ms
kafka-node:KafkaClient compressing messages if needed +422ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9092 (connected:
false) (ready: false) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
+1ms
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+583ms
kafka-node:KafkaClient compressing messages if needed +280ms
shashanksah actual broker!!!!![BrokerWrapper localhost:9093 (connected:
true) (ready: true) (idle: false) (needAuthentication: false)
(authenticated: false)]
kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
+723ms

The issue is that it gets the ip of host which is down and then waits on
that host to be ready (for 30secs) and then fails the sent request. If we
change the function ensureBrokerReady in this file:
https://github.com/SOHU-Co/kafka-node/blob/master/lib/kafkaClient.js#L1016
to something like given below, then this issue does not occurs:

  const ensureBrokerReady = async.ensureAsync((leader, callback) => {
    let broker = this.brokerForLeader(leader, longpolling);
    console.log("shashanksah actual broker!!!!!" + broker);
    if (!broker.isReady()) {
      this.refreshBrokerMetadata();
      broker = this.brokerForLeader(leader, longpolling);
      //console.log("shashanksah broker not ready so refresh and retry!!!!!");
    }
    if (!broker.isReady()) {
      //console.log("shashanksah !broker.isReady");
      logger.debug('missing apiSupport waiting until broker is ready...');
      this.waitUntilReady(broker, callback);
    } else {
      callback(null);
    }
  });

Please tell if I am missing anything or the RCA is correct.

Thanks,

Shashank

Reply via email to