After a further debugging, I found the following error on broken node (1011).
the error : [2016-12-19 21:07:38,081] ERROR Error while accepting connection (kafka.network.Acceptor) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at kafka.network.Acceptor.accept(SocketServer.scala:326) at kafka.network.Acceptor.run(SocketServer.scala:269) at java.lang.Thread.run(Thread.java:745) On Mon, Dec 19, 2016 at 4:33 PM, Tony Liu <jiangtao....@zuora.com> wrote: > Hi Experts, > > is there anyone run into this connection error ? > > [2016-12-17 20:13:32,728] WARN [ReplicaFetcherThread-1-1011], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5dad549c > (kafka.server.ReplicaFetcherThread)java.io.IOException: Connection to > 10.137.126.113:9092 (id: 1011 rack: null) failed > at > kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83) > at > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > > > by the way, I check the source code, which said there are lots of reasons > causing that error, so I am not clear about what the 'lots of reasons` are, > so that I put effort to do further addressing. > > /** > * Invokes `client.send` followed by 1 or more `client.poll` invocations > until a response is received or a > * disconnection happens (which can happen for a number of reasons including > a request timeout). > * > * In case of a disconnection, an `IOException` is thrown. > * > * This method is useful for implementing blocking behaviour on top of the > non-blocking `NetworkClient`, use it with > * care. > */ > def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): > ClientResponse = { > client.send(request, time.milliseconds()) > > pollContinuously { responses => > val response = responses.find { response => > response.request.request.header.correlationId == > request.request.header.correlationId > } > response.foreach { r => > if (r.wasDisconnected) { > val destination = request.request.destination > throw new IOException(s"Connection to $destination was disconnected > before the response was read") > } > } > response > } > > } > >