Yes that is exactly the issue. I did not notice the close(key) is called
from the poll() method as well. I was observing this even when I run my app
(not in debug). I noticed it was taking 1sec (with a conditional debug) and
like you mentioned the default time for reconnect.backoff.ms is 10ms and so
it was already elapsed.

If I increase reconnect.backoff.ms to 5sec, then it works to pick another
node.

[image: connect.time.PNG]


On Sat, Aug 22, 2015 at 3:56 PM Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> You're just seeing that exception in the debugger, not the log, right?
>
> ConnectException is an IOException, so it should be caught by this block
>
> https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L271
> , logged, and then the SelectionKey should be closed. Part of the close
> process adds it to the list of disconnections if there's an associated
> transmission object (which there should be, it is set up in the connect()
> call). This list is then processed by NetworkClient in
> handleDisconnections, which is invoked after the poll() call. That, in
> turn, marks the node as disconnected via the ClusterConnectionStates
> object. So it should still be getting marked as disconnected.
>
> However, maybe the issues is in the way we handle the blackout period. The
> corresponding setting is ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG (
> reconnect.backoff.ms). The default value is 10ms. However, the way this is
> currently handled, we mark the time when we *start* the connection attempt.
> If it takes more than 10ms for the connection attempt to fail, then the
> blackout period wouldn't actually apply since the the period would have
> already elapsed. If that happened, then leastLoadedNode would indeed
> continue to select the same node repeatedly.
>
> Can you tell from the logs how long the connection attempts are taking? You
> could try increasing the backoff time, although that has broader impact
> that could be negative (e.g., if a broker is temporarily down and you
> aren't stuck in this metadata fetch state, it increases the amount of time
> before you can start producing to that broker again). However, if you can't
> verify that this is the problem from the logs, it might at least help to
> verify in a test environment.
>
> I've filed https://issues.apache.org/jira/browse/KAFKA-2459 for that
> issue.
>
> -Ewen
>
>
> On Fri, Aug 21, 2015 at 11:42 PM, Kishore Senji <kse...@gmail.com> wrote:
>
> > Thank you Ewen. This behavior is something that I'm observing. I see in
> the
> > logs continuous Connect failures to the dead broker.
> >
> > The important thing here is I'm starting a brand new instance of the
> > Producer after a broker is down (so no prior metadata), with that down
> > broker also as part of the bootstrap list. With the brand new instance
> all
> > requests to send are blocked until the metadata is fetched. The metadata
> > fetching is where I'm seeing the issue. Currently the code randomly
> picks a
> > node to fetch the metadata and if it happens to the down node, I see the
> > connect failure and then it tries to fetch metadata again from the same
> > node (I do not see it going to black out because the status is always
> > "CONNECTING" and other nodes are not yet connected). This goes on forever
> > until I either bring the broker up or kill & restart the Producer and
> > on-restart if it picks a different node then it works to get the
> metadata.
> > Once it gets the metadata, it is fine as like you described above, it
> > updates the Cluster nodes.
> >
> > This can be a problem because we have to give a standard set of bootstrap
> > brokers across multiple producers whose lifecycle is not in control. The
> > producers can go down and a new instance can be brought up just like the
> > brokers where we expect a broker going down (so we do more partitioning
> and
> > replications)
> >
> > I get this exception -
> >
> > java.net.ConnectException: Connection refused: no further information
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.7.0_67]
> > at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> > ~[na:1.7.0_67]
> > at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> > ~[kafka-clients-0.8.2.1.jar:na]
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > [kafka-clients-0.8.2.1.jar:na]
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > [kafka-clients-0.8.2.1.jar:na]
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > [kafka-clients-0.8.2.1.jar:na]
> > at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
> >
> > I think the status never goes to blackout because this exception really
> > happens in the poll() and not in the connect() method, which is also
> > mentioned in the javadoc that the call is only initiated (and as long as
> > the dns entry is there) it only fails to connect in the poll() method.
> And
> > in the poll() method the status is not reset to DISCONNECTED and so it
> not
> > blacked out.
> >
> >
> > On Fri, Aug 21, 2015 at 10:06 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > Are you seeing this in practice or is this just a concern about the way
> > the
> > > code currently works? If the broker is actually down and the host is
> > > rejecting connections, the situation you describe shouldn't be a
> problem.
> > > It's true that the NetworkClient chooses a fixed nodeIndexOffset, but
> the
> > > expectation is that if we run one iteration of leastLoadedNode and
> > select a
> > > node, we'll try to connect and any failure will be handled by putting
> > that
> > > node into a blackout period during which subsequent calls to
> > > leastLoadedNode will give priority to other options. If your server is
> > > *not* explicitly rejecting connections, I think it could be possible
> that
> > > we end up hanging for a long while just waiting for that connection. If
> > > this is the case (e.g., if you are running on EC2 and it has this
> > behavior
> > > -- I believe default firewall rules will not kill the connection), this
> > > would be useful to know.
> > >
> > > A couple of bugs you might want to be aware of:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-1843 is meant to generally
> > > address the fact that there are a lot of states that we could be in,
> and
> > > the way we handle them (especially with leastLoadedNode), may not work
> > well
> > > in all cases. It's very difficult to be comprehensive here, so if there
> > is
> > > a scenario that is not failing for you, the more information you can
> give
> > > about the state of the system and the producer, the better.
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-1842 might also be
> relevant
> > --
> > > right now we rely on the underlying TCP connection timeouts, but this
> is
> > > definitely not ideal. They can be quite long by default, and we might
> > want
> > > to consider connections failed much sooner.
> > >
> > > I also could have sworn there was a JIRA filed about the fact that the
> > > bootstrap servers are never reused, but I can't find it at the moment
> --
> > in
> > > some cases, if you have no better option then it would be best to
> revert
> > > back to the original set of bootstrap servers for loading metadata.
> This
> > > can especially become a problem in some cases where your only producing
> > to
> > > one or a small number of topics and therefore only have metadata for a
> > > couple of servers. If anything happens to those servers too quickly
> > (within
> > > the metadata refresh period) you might potentially get stuck with only
> > > references to dead nodes.
> > >
> > > -Ewen
> > >
> > > On Fri, Aug 21, 2015 at 6:56 PM, Kishore Senji <kse...@gmail.com>
> wrote:
> > >
> > > > If one of the broker we specify in the bootstrap servers list is
> down,
> > > > there is a chance that the Producer (a brand new instance with no
> prior
> > > > metadata) will never be able to publish anything to Kafka until that
> > > broker
> > > > is up. Because the logic for getting the initial metadata is based on
> > > some
> > > > random index to the set of bootstrap nodes and if it happens to be
> the
> > > down
> > > > node, Kafka producer keeps on trying to get the metadata on that node
> > > only.
> > > > It is never switched to another node. Without metadata, the Producer
> > can
> > > > never send anything.
> > > >
> > > > The nodeIndexOffset is chosen at the creation of the NetworkClient
> (and
> > > > this offset is not changed when we fail to get a new connection) and
> so
> > > for
> > > > getting the metadata for the first time, there is a possibility that
> we
> > > > keep on trying on the broker that is down.
> > > >
> > > > This can be a problem if a broker goes down and also a Producer is
> > > > restarted or a new instance is brought up. Is this a known issue?
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>

Reply via email to