Hi, Jun

We experienced a network device problem. and cause all brokers crashed.
After investigation, we found server log throw similar exceptions.

this:

java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:29)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)


and this:

2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error writing
to highwatermark file:
java.io.FileNotFoundException:
/data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
        at java.io.FileWriter.<init>(FileWriter.java:73)
        at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
        at
kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)


we count the number of java.nio.channels.UnresolvedAddressException  and
found it is around 63000, since a healthy kafka would open 2k fd in our
environment, we believe opened fd hit the our system's limit 65535.

so, it seems the bug is not fixed.

after checking the code, we believe it still would leak socket fd.
===============================================
our guess:

in simpleconsumer.scala:

  private def disconnect() = {
    if(blockingChannel.isConnected) {
      debug("Disconnecting from " + host + ":" + port)
      blockingChannel.disconnect()
    }
  }

but when the exception happened, blockingChannel.isConnected would be false,
because in blockingchannel.scala:

  def connect() = lock synchronized  {
    if(!connected) {
      channel = SocketChannel.open()
      if(readBufferSize > 0)
        channel.socket.setReceiveBufferSize(readBufferSize)
      if(writeBufferSize > 0)
        channel.socket.setSendBufferSize(writeBufferSize)
      channel.configureBlocking(true)
      channel.socket.setSoTimeout(readTimeoutMs)
      channel.socket.setKeepAlive(true)
      channel.socket.setTcpNoDelay(true)
      channel.connect(new InetSocketAddress(host, port))    <--  exception
happened here

      writeChannel = channel
      readChannel = Channels.newChannel(channel.socket().getInputStream)
      connected = true   <--  connected reset happened here, no chance to
be true
      ... ...


Thanks.


-- 


*Best Regards*Xiang Helin

Reply via email to