Ah, make sense. It seems that this is already fixed in 0.8.2 and trunk.

Thanks,

Jun

On Fri, Dec 12, 2014 at 5:34 PM, Helin Xiang <xkee...@gmail.com> wrote:

> Hi, Jun
>
> What you said is right, but in the code of simpleconsumer ( where the
> BlockingChannel.disconnect()
> will be called ), it firstly checked if the channel is connected, that's
> the real problem.
>
> And we reproduced the problem in our testing environment.
> first we use iptables to drop packet and reset tcp link to/from the other
> broker;
> secondly, we use iptables to drop packet to/from dns server.
>
> then the broker crashes immediately.
>
> after we change the code, it will not crash anymore.
>
> =========
>
> --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> @@ -46,10 +46,10 @@ class SimpleConsumer(val host: String,
>    }
>
>    private def disconnect() = {
> -    if(blockingChannel.isConnected) {
> +    //if(blockingChannel.isConnected) {
>        debug("Disconnecting from " + host + ":" + port)
>        blockingChannel.disconnect()
> -    }
> +    //}
>    }
>
>
> On Sat, Dec 13, 2014 at 1:23 AM, Jun Rao <j...@confluent.io> wrote:
> >
> > Hmm, but if we hit an exception in BlockingChannel.connect(), we will
> > call BlockingChannel.disconnect(), which will close the socket channel.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xkee...@gmail.com> wrote:
> >
> > > Hi, Jun
> > >
> > > We experienced a network device problem. and cause all brokers crashed.
> > > After investigation, we found server log throw similar exceptions.
> > >
> > > this:
> > >
> > > java.nio.channels.UnresolvedAddressException
> > >         at sun.nio.ch.Net.checkAddress(Net.java:29)
> > >         at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
> > >         at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > >         at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >         at
> > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >         at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >         at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >
> > >
> > > and this:
> > >
> > > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> > > kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error
> > writing
> > > to highwatermark file:
> > > java.io.FileNotFoundException:
> > > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open
> files)
> > >         at java.io.FileOutputStream.open(Native Method)
> > >         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> > >         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> > >         at java.io.FileWriter.<init>(FileWriter.java:73)
> > >         at
> kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> > >         at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
> > >         at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > >
> > >
> > > we count the number of java.nio.channels.UnresolvedAddressException
> and
> > > found it is around 63000, since a healthy kafka would open 2k fd in our
> > > environment, we believe opened fd hit the our system's limit 65535.
> > >
> > > so, it seems the bug is not fixed.
> > >
> > > after checking the code, we believe it still would leak socket fd.
> > > ===============================================
> > > our guess:
> > >
> > > in simpleconsumer.scala:
> > >
> > >   private def disconnect() = {
> > >     if(blockingChannel.isConnected) {
> > >       debug("Disconnecting from " + host + ":" + port)
> > >       blockingChannel.disconnect()
> > >     }
> > >   }
> > >
> > > but when the exception happened, blockingChannel.isConnected would be
> > > false,
> > > because in blockingchannel.scala:
> > >
> > >   def connect() = lock synchronized  {
> > >     if(!connected) {
> > >       channel = SocketChannel.open()
> > >       if(readBufferSize > 0)
> > >         channel.socket.setReceiveBufferSize(readBufferSize)
> > >       if(writeBufferSize > 0)
> > >         channel.socket.setSendBufferSize(writeBufferSize)
> > >       channel.configureBlocking(true)
> > >       channel.socket.setSoTimeout(readTimeoutMs)
> > >       channel.socket.setKeepAlive(true)
> > >       channel.socket.setTcpNoDelay(true)
> > >       channel.connect(new InetSocketAddress(host, port))    <--
> > exception
> > > happened here
> > >
> > >       writeChannel = channel
> > >       readChannel =
> Channels.newChannel(channel.socket().getInputStream)
> > >       connected = true   <--  connected reset happened here, no chance
> to
> > > be true
> > >       ... ...
> > >
> > >
> > > Thanks.
> > >
> > >
> > > --
> > >
> > >
> > > *Best Regards*Xiang Helin
> > >
> >
>
>
> --
>
>
> *Best Regards向河林*
>

Reply via email to