Hmm, but if we hit an exception in BlockingChannel.connect(), we will call BlockingChannel.disconnect(), which will close the socket channel.
Thanks, Jun On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xkee...@gmail.com> wrote: > Hi, Jun > > We experienced a network device problem. and cause all brokers crashed. > After investigation, we found server log throw similar exceptions. > > this: > > java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:29) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > and this: > > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL > kafka.server.ReplicaManager - [Replica Manager on Broker 1]: Error writing > to highwatermark file: > java.io.FileNotFoundException: > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:194) > at java.io.FileOutputStream.<init>(FileOutputStream.java:145) > at java.io.FileWriter.<init>(FileWriter.java:73) > at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37) > at > > kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447) > at > > kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > we count the number of java.nio.channels.UnresolvedAddressException and > found it is around 63000, since a healthy kafka would open 2k fd in our > environment, we believe opened fd hit the our system's limit 65535. > > so, it seems the bug is not fixed. > > after checking the code, we believe it still would leak socket fd. > =============================================== > our guess: > > in simpleconsumer.scala: > > private def disconnect() = { > if(blockingChannel.isConnected) { > debug("Disconnecting from " + host + ":" + port) > blockingChannel.disconnect() > } > } > > but when the exception happened, blockingChannel.isConnected would be > false, > because in blockingchannel.scala: > > def connect() = lock synchronized { > if(!connected) { > channel = SocketChannel.open() > if(readBufferSize > 0) > channel.socket.setReceiveBufferSize(readBufferSize) > if(writeBufferSize > 0) > channel.socket.setSendBufferSize(writeBufferSize) > channel.configureBlocking(true) > channel.socket.setSoTimeout(readTimeoutMs) > channel.socket.setKeepAlive(true) > channel.socket.setTcpNoDelay(true) > channel.connect(new InetSocketAddress(host, port)) <-- exception > happened here > > writeChannel = channel > readChannel = Channels.newChannel(channel.socket().getInputStream) > connected = true <-- connected reset happened here, no chance to > be true > ... ... > > > Thanks. > > > -- > > > *Best Regards*Xiang Helin >