Thank you Ewen. This behavior is something that I'm observing. I see in the logs continuous Connect failures to the dead broker.
The important thing here is I'm starting a brand new instance of the Producer after a broker is down (so no prior metadata), with that down broker also as part of the bootstrap list. With the brand new instance all requests to send are blocked until the metadata is fetched. The metadata fetching is where I'm seeing the issue. Currently the code randomly picks a node to fetch the metadata and if it happens to the down node, I see the connect failure and then it tries to fetch metadata again from the same node (I do not see it going to black out because the status is always "CONNECTING" and other nodes are not yet connected). This goes on forever until I either bring the broker up or kill & restart the Producer and on-restart if it picks a different node then it works to get the metadata. Once it gets the metadata, it is fine as like you described above, it updates the Cluster nodes. This can be a problem because we have to give a standard set of bootstrap brokers across multiple producers whose lifecycle is not in control. The producers can go down and a new instance can be brought up just like the brokers where we expect a broker going down (so we do more partitioning and replications) I get this exception - java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_67] at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.7.0_67] at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] at java.lang.Thread.run(Unknown Source) [na:1.7.0_67] I think the status never goes to blackout because this exception really happens in the poll() and not in the connect() method, which is also mentioned in the javadoc that the call is only initiated (and as long as the dns entry is there) it only fails to connect in the poll() method. And in the poll() method the status is not reset to DISCONNECTED and so it not blacked out. On Fri, Aug 21, 2015 at 10:06 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > Are you seeing this in practice or is this just a concern about the way the > code currently works? If the broker is actually down and the host is > rejecting connections, the situation you describe shouldn't be a problem. > It's true that the NetworkClient chooses a fixed nodeIndexOffset, but the > expectation is that if we run one iteration of leastLoadedNode and select a > node, we'll try to connect and any failure will be handled by putting that > node into a blackout period during which subsequent calls to > leastLoadedNode will give priority to other options. If your server is > *not* explicitly rejecting connections, I think it could be possible that > we end up hanging for a long while just waiting for that connection. If > this is the case (e.g., if you are running on EC2 and it has this behavior > -- I believe default firewall rules will not kill the connection), this > would be useful to know. > > A couple of bugs you might want to be aware of: > > https://issues.apache.org/jira/browse/KAFKA-1843 is meant to generally > address the fact that there are a lot of states that we could be in, and > the way we handle them (especially with leastLoadedNode), may not work well > in all cases. It's very difficult to be comprehensive here, so if there is > a scenario that is not failing for you, the more information you can give > about the state of the system and the producer, the better. > > https://issues.apache.org/jira/browse/KAFKA-1842 might also be relevant -- > right now we rely on the underlying TCP connection timeouts, but this is > definitely not ideal. They can be quite long by default, and we might want > to consider connections failed much sooner. > > I also could have sworn there was a JIRA filed about the fact that the > bootstrap servers are never reused, but I can't find it at the moment -- in > some cases, if you have no better option then it would be best to revert > back to the original set of bootstrap servers for loading metadata. This > can especially become a problem in some cases where your only producing to > one or a small number of topics and therefore only have metadata for a > couple of servers. If anything happens to those servers too quickly (within > the metadata refresh period) you might potentially get stuck with only > references to dead nodes. > > -Ewen > > On Fri, Aug 21, 2015 at 6:56 PM, Kishore Senji <kse...@gmail.com> wrote: > > > If one of the broker we specify in the bootstrap servers list is down, > > there is a chance that the Producer (a brand new instance with no prior > > metadata) will never be able to publish anything to Kafka until that > broker > > is up. Because the logic for getting the initial metadata is based on > some > > random index to the set of bootstrap nodes and if it happens to be the > down > > node, Kafka producer keeps on trying to get the metadata on that node > only. > > It is never switched to another node. Without metadata, the Producer can > > never send anything. > > > > The nodeIndexOffset is chosen at the creation of the NetworkClient (and > > this offset is not changed when we fail to get a new connection) and so > for > > getting the metadata for the first time, there is a possibility that we > > keep on trying on the broker that is down. > > > > This can be a problem if a broker goes down and also a Producer is > > restarted or a new instance is brought up. Is this a known issue? > > > > > > -- > Thanks, > Ewen >