Hi, Jun We experienced a network device problem. and cause all brokers crashed. After investigation, we found server log throw similar exceptions.
this: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:29) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) and this: 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL kafka.server.ReplicaManager - [Replica Manager on Broker 1]: Error writing to highwatermark file: java.io.FileNotFoundException: /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.<init>(FileOutputStream.java:194) at java.io.FileOutputStream.<init>(FileOutputStream.java:145) at java.io.FileWriter.<init>(FileWriter.java:73) at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37) at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447) at kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) we count the number of java.nio.channels.UnresolvedAddressException and found it is around 63000, since a healthy kafka would open 2k fd in our environment, we believe opened fd hit the our system's limit 65535. so, it seems the bug is not fixed. after checking the code, we believe it still would leak socket fd. =============================================== our guess: in simpleconsumer.scala: private def disconnect() = { if(blockingChannel.isConnected) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } } but when the exception happened, blockingChannel.isConnected would be false, because in blockingchannel.scala: def connect() = lock synchronized { if(!connected) { channel = SocketChannel.open() if(readBufferSize > 0) channel.socket.setReceiveBufferSize(readBufferSize) if(writeBufferSize > 0) channel.socket.setSendBufferSize(writeBufferSize) channel.configureBlocking(true) channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) channel.connect(new InetSocketAddress(host, port)) <-- exception happened here writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true <-- connected reset happened here, no chance to be true ... ... Thanks. -- *Best Regards*Xiang Helin