Hi, Jun

What you said is right, but in the code of simpleconsumer ( where the
BlockingChannel.disconnect()
will be called ), it firstly checked if the channel is connected, that's
the real problem.
And we reproduced the problem in our testing environment.
first we use iptables to drop packet and reset tcp link to/from the other
broker;
secondly, we use iptables to drop packet to/from dns server.

then the broker crashes immediately.

after we change the code, it will not crash anymore.

=========

--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -46,10 +46,10 @@ class SimpleConsumer(val host: String,
   }

   private def disconnect() = {
-    if(blockingChannel.isConnected) {
+    //if(blockingChannel.isConnected) {
       debug("Disconnecting from " + host + ":" + port)
       blockingChannel.disconnect()
-    }
+    //}
   }


On Sat, Dec 13, 2014 at 1:23 AM, Jun Rao <j...@confluent.io> wrote:
>
> Hmm, but if we hit an exception in BlockingChannel.connect(), we will
> call BlockingChannel.disconnect(), which will close the socket channel.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 9, 2014 at 7:09 PM, Helin Xiang <xkee...@gmail.com> wrote:
>
> > Hi, Jun
> >
> > We experienced a network device problem. and cause all brokers crashed.
> > After investigation, we found server log throw similar exceptions.
> >
> > this:
> >
> > java.nio.channels.UnresolvedAddressException
> >         at sun.nio.ch.Net.checkAddress(Net.java:29)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >         at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> >
> > and this:
> >
> > 2014-12-04 17:12:19,223 [kafka-scheduler-1] FATAL
> > kafka.server.ReplicaManager  - [Replica Manager on Broker 1]: Error
> writing
> > to highwatermark file:
> > java.io.FileNotFoundException:
> > /data3/kafka-logs/replication-offset-checkpoint.tmp (Too many open files)
> >         at java.io.FileOutputStream.open(Native Method)
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
> >         at java.io.FileOutputStream.<init>(FileOutputStream.java:145)
> >         at java.io.FileWriter.<init>(FileWriter.java:73)
> >         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> >         at
> >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:447)
> >         at
> >
> >
> kafka.server.ReplicaManager$$anonfun$checkpointHighWatermarks$2.apply(ReplicaManager.scala:444)
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> >
> >
> > we count the number of java.nio.channels.UnresolvedAddressException  and
> > found it is around 63000, since a healthy kafka would open 2k fd in our
> > environment, we believe opened fd hit the our system's limit 65535.
> >
> > so, it seems the bug is not fixed.
> >
> > after checking the code, we believe it still would leak socket fd.
> > ===============================================
> > our guess:
> >
> > in simpleconsumer.scala:
> >
> >   private def disconnect() = {
> >     if(blockingChannel.isConnected) {
> >       debug("Disconnecting from " + host + ":" + port)
> >       blockingChannel.disconnect()
> >     }
> >   }
> >
> > but when the exception happened, blockingChannel.isConnected would be
> > false,
> > because in blockingchannel.scala:
> >
> >   def connect() = lock synchronized  {
> >     if(!connected) {
> >       channel = SocketChannel.open()
> >       if(readBufferSize > 0)
> >         channel.socket.setReceiveBufferSize(readBufferSize)
> >       if(writeBufferSize > 0)
> >         channel.socket.setSendBufferSize(writeBufferSize)
> >       channel.configureBlocking(true)
> >       channel.socket.setSoTimeout(readTimeoutMs)
> >       channel.socket.setKeepAlive(true)
> >       channel.socket.setTcpNoDelay(true)
> >       channel.connect(new InetSocketAddress(host, port))    <--
> exception
> > happened here
> >
> >       writeChannel = channel
> >       readChannel = Channels.newChannel(channel.socket().getInputStream)
> >       connected = true   <--  connected reset happened here, no chance to
> > be true
> >       ... ...
> >
> >
> > Thanks.
> >
> >
> > --
> >
> >
> > *Best Regards*Xiang Helin
> >
>


-- 


*Best Regards向河林*

Reply via email to