Yes that is exactly the issue. I did not notice the close(key) is called from the poll() method as well. I was observing this even when I run my app (not in debug). I noticed it was taking 1sec (with a conditional debug) and like you mentioned the default time for reconnect.backoff.ms is 10ms and so it was already elapsed.
If I increase reconnect.backoff.ms to 5sec, then it works to pick another node. [image: connect.time.PNG] On Sat, Aug 22, 2015 at 3:56 PM Ewen Cheslack-Postava <e...@confluent.io> wrote: > You're just seeing that exception in the debugger, not the log, right? > > ConnectException is an IOException, so it should be caught by this block > > https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L271 > , logged, and then the SelectionKey should be closed. Part of the close > process adds it to the list of disconnections if there's an associated > transmission object (which there should be, it is set up in the connect() > call). This list is then processed by NetworkClient in > handleDisconnections, which is invoked after the poll() call. That, in > turn, marks the node as disconnected via the ClusterConnectionStates > object. So it should still be getting marked as disconnected. > > However, maybe the issues is in the way we handle the blackout period. The > corresponding setting is ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG ( > reconnect.backoff.ms). The default value is 10ms. However, the way this is > currently handled, we mark the time when we *start* the connection attempt. > If it takes more than 10ms for the connection attempt to fail, then the > blackout period wouldn't actually apply since the the period would have > already elapsed. If that happened, then leastLoadedNode would indeed > continue to select the same node repeatedly. > > Can you tell from the logs how long the connection attempts are taking? You > could try increasing the backoff time, although that has broader impact > that could be negative (e.g., if a broker is temporarily down and you > aren't stuck in this metadata fetch state, it increases the amount of time > before you can start producing to that broker again). However, if you can't > verify that this is the problem from the logs, it might at least help to > verify in a test environment. > > I've filed https://issues.apache.org/jira/browse/KAFKA-2459 for that > issue. > > -Ewen > > > On Fri, Aug 21, 2015 at 11:42 PM, Kishore Senji <kse...@gmail.com> wrote: > > > Thank you Ewen. This behavior is something that I'm observing. I see in > the > > logs continuous Connect failures to the dead broker. > > > > The important thing here is I'm starting a brand new instance of the > > Producer after a broker is down (so no prior metadata), with that down > > broker also as part of the bootstrap list. With the brand new instance > all > > requests to send are blocked until the metadata is fetched. The metadata > > fetching is where I'm seeing the issue. Currently the code randomly > picks a > > node to fetch the metadata and if it happens to the down node, I see the > > connect failure and then it tries to fetch metadata again from the same > > node (I do not see it going to black out because the status is always > > "CONNECTING" and other nodes are not yet connected). This goes on forever > > until I either bring the broker up or kill & restart the Producer and > > on-restart if it picks a different node then it works to get the > metadata. > > Once it gets the metadata, it is fine as like you described above, it > > updates the Cluster nodes. > > > > This can be a problem because we have to give a standard set of bootstrap > > brokers across multiple producers whose lifecycle is not in control. The > > producers can go down and a new instance can be brought up just like the > > brokers where we expect a broker going down (so we do more partitioning > and > > replications) > > > > I get this exception - > > > > java.net.ConnectException: Connection refused: no further information > > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[na:1.7.0_67] > > at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) > > ~[na:1.7.0_67] > > at org.apache.kafka.common.network.Selector.poll(Selector.java:238) > > ~[kafka-clients-0.8.2.1.jar:na] > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) > > [kafka-clients-0.8.2.1.jar:na] > > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) > > [kafka-clients-0.8.2.1.jar:na] > > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) > > [kafka-clients-0.8.2.1.jar:na] > > at java.lang.Thread.run(Unknown Source) [na:1.7.0_67] > > > > I think the status never goes to blackout because this exception really > > happens in the poll() and not in the connect() method, which is also > > mentioned in the javadoc that the call is only initiated (and as long as > > the dns entry is there) it only fails to connect in the poll() method. > And > > in the poll() method the status is not reset to DISCONNECTED and so it > not > > blacked out. > > > > > > On Fri, Aug 21, 2015 at 10:06 PM, Ewen Cheslack-Postava < > e...@confluent.io > > > > > wrote: > > > > > Are you seeing this in practice or is this just a concern about the way > > the > > > code currently works? If the broker is actually down and the host is > > > rejecting connections, the situation you describe shouldn't be a > problem. > > > It's true that the NetworkClient chooses a fixed nodeIndexOffset, but > the > > > expectation is that if we run one iteration of leastLoadedNode and > > select a > > > node, we'll try to connect and any failure will be handled by putting > > that > > > node into a blackout period during which subsequent calls to > > > leastLoadedNode will give priority to other options. If your server is > > > *not* explicitly rejecting connections, I think it could be possible > that > > > we end up hanging for a long while just waiting for that connection. If > > > this is the case (e.g., if you are running on EC2 and it has this > > behavior > > > -- I believe default firewall rules will not kill the connection), this > > > would be useful to know. > > > > > > A couple of bugs you might want to be aware of: > > > > > > https://issues.apache.org/jira/browse/KAFKA-1843 is meant to generally > > > address the fact that there are a lot of states that we could be in, > and > > > the way we handle them (especially with leastLoadedNode), may not work > > well > > > in all cases. It's very difficult to be comprehensive here, so if there > > is > > > a scenario that is not failing for you, the more information you can > give > > > about the state of the system and the producer, the better. > > > > > > https://issues.apache.org/jira/browse/KAFKA-1842 might also be > relevant > > -- > > > right now we rely on the underlying TCP connection timeouts, but this > is > > > definitely not ideal. They can be quite long by default, and we might > > want > > > to consider connections failed much sooner. > > > > > > I also could have sworn there was a JIRA filed about the fact that the > > > bootstrap servers are never reused, but I can't find it at the moment > -- > > in > > > some cases, if you have no better option then it would be best to > revert > > > back to the original set of bootstrap servers for loading metadata. > This > > > can especially become a problem in some cases where your only producing > > to > > > one or a small number of topics and therefore only have metadata for a > > > couple of servers. If anything happens to those servers too quickly > > (within > > > the metadata refresh period) you might potentially get stuck with only > > > references to dead nodes. > > > > > > -Ewen > > > > > > On Fri, Aug 21, 2015 at 6:56 PM, Kishore Senji <kse...@gmail.com> > wrote: > > > > > > > If one of the broker we specify in the bootstrap servers list is > down, > > > > there is a chance that the Producer (a brand new instance with no > prior > > > > metadata) will never be able to publish anything to Kafka until that > > > broker > > > > is up. Because the logic for getting the initial metadata is based on > > > some > > > > random index to the set of bootstrap nodes and if it happens to be > the > > > down > > > > node, Kafka producer keeps on trying to get the metadata on that node > > > only. > > > > It is never switched to another node. Without metadata, the Producer > > can > > > > never send anything. > > > > > > > > The nodeIndexOffset is chosen at the creation of the NetworkClient > (and > > > > this offset is not changed when we fail to get a new connection) and > so > > > for > > > > getting the metadata for the first time, there is a possibility that > we > > > > keep on trying on the broker that is down. > > > > > > > > This can be a problem if a broker goes down and also a Producer is > > > > restarted or a new instance is brought up. Is this a known issue? > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > > > > > > > > -- > Thanks, > Ewen >