Hmm, but if we hit an exception in BlockingChannel.connect(), we will
call BlockingChannel.disconnect(), which will close the socket channel.

Thanks,

Jun

On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xkee...@gmail.com> wrote:

> Hi, Jun
>
> We experienced a network device problem. and cause all brokers crashed.
> After investigation, we found server log throw similar exceptions.
>
> this:
>
> java.nio.channels.UnresolvedAddressException
>         at sun.nio.ch.Net.checkAddress(Net.java:29)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
>
> and this:
>
> 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error writing
> to highwatermark file:
> java.io.FileNotFoundException:
> /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
>         at java.io.FileOutputStream.open(Native Method)
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
>         at java.io.FileWriter.<init>(FileWriter.java:73)
>         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
>         at
>
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
>         at
>
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>
>
> we count the number of java.nio.channels.UnresolvedAddressException  and
> found it is around 63000, since a healthy kafka would open 2k fd in our
> environment, we believe opened fd hit the our system's limit 65535.
>
> so, it seems the bug is not fixed.
>
> after checking the code, we believe it still would leak socket fd.
> ===============================================
> our guess:
>
> in simpleconsumer.scala:
>
>   private def disconnect() = {
>     if(blockingChannel.isConnected) {
>       debug("Disconnecting from " + host + ":" + port)
>       blockingChannel.disconnect()
>     }
>   }
>
> but when the exception happened, blockingChannel.isConnected would be
> false,
> because in blockingchannel.scala:
>
>   def connect() = lock synchronized  {
>     if(!connected) {
>       channel = SocketChannel.open()
>       if(readBufferSize > 0)
>         channel.socket.setReceiveBufferSize(readBufferSize)
>       if(writeBufferSize > 0)
>         channel.socket.setSendBufferSize(writeBufferSize)
>       channel.configureBlocking(true)
>       channel.socket.setSoTimeout(readTimeoutMs)
>       channel.socket.setKeepAlive(true)
>       channel.socket.setTcpNoDelay(true)
>       channel.connect(new InetSocketAddress(host, port))    <--  exception
> happened here
>
>       writeChannel = channel
>       readChannel = Channels.newChannel(channel.socket().getInputStream)
>       connected = true   <--  connected reset happened here, no chance to
> be true
>       ... ...
>
>
> Thanks.
>
>
> --
>
>
> *Best Regards*Xiang Helin
>

Reply via email to