Ah, make sense. It seems that this is already fixed in 0.8.2 and trunk. Thanks,
Jun On Fri, Dec 12, 2014 at 5:34 PM, Helin Xiang <xkee...@gmail.com> wrote: > Hi, Jun > > What you said is right, but in the code of simpleconsumer ( where the > BlockingChannel.disconnect() > will be called ), it firstly checked if the channel is connected, that's > the real problem. > > And we reproduced the problem in our testing environment. > first we use iptables to drop packet and reset tcp link to/from the other > broker; > secondly, we use iptables to drop packet to/from dns server. > > then the broker crashes immediately. > > after we change the code, it will not crash anymore. > > ========= > > --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > @@ -46,10 +46,10 @@ class SimpleConsumer(val host: String, > } > > private def disconnect() = { > - if(blockingChannel.isConnected) { > + //if(blockingChannel.isConnected) { > debug("Disconnecting from " + host + ":" + port) > blockingChannel.disconnect() > - } > + //} > } > > > On Sat, Dec 13, 2014 at 1:23 AM, Jun Rao <j...@confluent.io> wrote: > > > > Hmm, but if we hit an exception in BlockingChannel.connect(), we will > > call BlockingChannel.disconnect(), which will close the socket channel. > > > > Thanks, > > > > Jun > > > > On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xkee...@gmail.com> wrote: > > > > > Hi, Jun > > > > > > We experienced a network device problem. and cause all brokers crashed. > > > After investigation, we found server log throw similar exceptions. > > > > > > this: > > > > > > java.nio.channels.UnresolvedAddressException > > > at sun.nio.ch.Net.checkAddress(Net.java:29) > > > at > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) > > > at > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > at > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > > > at > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) > > > at > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > > > > > > > and this: > > > > > > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL > > > kafka.server.ReplicaManager - [Replica Manager on Broker 1]: Error > > writing > > > to highwatermark file: > > > java.io.FileNotFoundException: > > > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open > files) > > > at java.io.FileOutputStream.open(Native Method) > > > at java.io.FileOutputStream.<init>(FileOutputStream.java:194) > > > at java.io.FileOutputStream.<init>(FileOutputStream.java:145) > > > at java.io.FileWriter.<init>(FileWriter.java:73) > > > at > kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37) > > > at > > > > > > > > > kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447) > > > at > > > > > > > > > kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444) > > > at > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) > > > at > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > > > > > > > we count the number of java.nio.channels.UnresolvedAddressException > and > > > found it is around 63000, since a healthy kafka would open 2k fd in our > > > environment, we believe opened fd hit the our system's limit 65535. > > > > > > so, it seems the bug is not fixed. > > > > > > after checking the code, we believe it still would leak socket fd. > > > =============================================== > > > our guess: > > > > > > in simpleconsumer.scala: > > > > > > private def disconnect() = { > > > if(blockingChannel.isConnected) { > > > debug("Disconnecting from " + host + ":" + port) > > > blockingChannel.disconnect() > > > } > > > } > > > > > > but when the exception happened, blockingChannel.isConnected would be > > > false, > > > because in blockingchannel.scala: > > > > > > def connect() = lock synchronized { > > > if(!connected) { > > > channel = SocketChannel.open() > > > if(readBufferSize > 0) > > > channel.socket.setReceiveBufferSize(readBufferSize) > > > if(writeBufferSize > 0) > > > channel.socket.setSendBufferSize(writeBufferSize) > > > channel.configureBlocking(true) > > > channel.socket.setSoTimeout(readTimeoutMs) > > > channel.socket.setKeepAlive(true) > > > channel.socket.setTcpNoDelay(true) > > > channel.connect(new InetSocketAddress(host, port)) <-- > > exception > > > happened here > > > > > > writeChannel = channel > > > readChannel = > Channels.newChannel(channel.socket().getInputStream) > > > connected = true <-- connected reset happened here, no chance > to > > > be true > > > ... ... > > > > > > > > > Thanks. > > > > > > > > > -- > > > > > > > > > *Best Regards*Xiang Helin > > > > > > > > -- > > > *Best Regards向河林* >