Ewen, you are right, the patch is committed on Feb.20th last year, I will
leave a comment and close that ticket.

On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
> will only be included in 0.8.2.
>
> Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
> is still open and there's a comment that moved it to 0.9 after the commit
> was already made. Was the commit a mistake or did we just forget to close
> it?
>
> On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Here is the relevant stack trace:
> >
> > java.nio.channels.UnresolvedAddressException: null
> >
> >         at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
> >
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> > ~[na:1.7.0_55]
> >
> >         at kafka.network.BlockingChannel.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown
> Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> > Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.metrics.KafkaTimer.time(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> >         at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >
> > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > I am using 0.8.1. The source is here:
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> > >
> > > Here is the definition of disconnect():
> > > private def disconnect() = {
> > >     if(blockingChannel.isConnected) {
> > >       debug("Disconnecting from " + host + ":" + port)
> > >       blockingChannel.disconnect()
> > >     }
> > >   }
> > > It checks if blockingChannel.isConnected before calling
> > > blockingChannel.disconnect(). I think if there is an
> > > UnresolvedAddressException, the isConnected is never set and the
> > > blockingChannel.disconnect() is never called. But by this point we have
> > > already created a socket and will leak it.
> > >
> > > The same problem might be present in the connect method of the
> > > BlockingChannel at
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> > .
> > > Though its own disconnect method seems to check for both the connected:
> > >
> > > def disconnect() = lock synchronized {
> > >     // My comment: connected will not be set if we get an
> > > UnresolvedAddressException but channel should NOT  be null, so we will
> > > probably still do the right thing.
> > >     if(connected || channel != null) {
> > >       // closing the main socket channel *should* close the read
> channel
> > >       // but let's do it to be sure.
> > >       swallow(channel.close())
> > >       swallow(channel.socket.close())
> > >       if(readChannel != null) swallow(readChannel.close())
> > >       channel = null; readChannel = null; writeChannel = null
> > >       connected = false
> > >     }
> > >   }
> > >
> > >
> > >
> > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Rajiv,
> > >>
> > >> Which version of Kafka are you using? I just checked SimpleConsumer's
> > >> code,
> > >> and in its close() function, disconnect() is called, which will close
> > the
> > >> socket.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> > >> wrote:
> > >>
> > >> > Meant to write a run loop.
> > >> >
> > >> > void run() {
> > >> >   while (running) {
> > >> >     if (simpleConsumer == null) {
> > >> >       simpleConsumer = new SimpleConsumer(host, port,
> > >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> > >> >     }
> > >> >     try {
> > >> >       // Do stuff with simpleConsumer.
> > >> >     } catch (Exception e) {
> > >> >       logger.error(e);  // Assume UnresolvedAddressException.
> > >> >       if (consumer != null) {
> > >> >         simpleConsumer.close();
> > >> >         simpleConsumer = null;
> > >> >       }
> > >> >     }
> > >> >   }
> > >> > }
> > >> >
> > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com
> >
> > >> > wrote:
> > >> >
> > >> > > Here is my typical flow:
> > >> > > void run() {
> > >> > >   if (simpleConsumer == null) {
> > >> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> > >> > kafkaSocketTimeout,
> > >> > > kafkaRExeiveBufferSize, clientName);
> > >> > >   }
> > >> > >   try {
> > >> > >     // Do stuff with simpleConsumer.
> > >> > >    } catch (Exception e) {
> > >> > >      if (consumer != null) {
> > >> > >        simpleConsumer.close();
> > >> > >        simpleConsumer = null;
> > >> > >      }
> > >> > >   }
> > >> > > }
> > >> > >
> > >> > > If there is a problem with the host name, or some DNS issues, we
> get
> > >> an
> > >> > > UnresolvedAddressException as expected and attempt to close the
> > >> > > simpleConsumer. However this does not really get rid of the
> > underlying
> > >> > > socket. So we end up leaking a FD every time this happens. Though
> > >> this is
> > >> > > not a common case I think there needs to be a way on the
> > >> SimpleConsumer
> > >> > to
> > >> > > get rid of all OS resources that it is holding onto. Right now if
> > this
> > >> > > keeps happening the number of FDs being consumed by the process
> > keeps
> > >> > > increasing till we hit the OS limits. As a user I cannot do
> anything
> > >> else
> > >> > > but call simpleConsumer.close(). We need to be able to close the
> > >> > underlying
> > >> > > socketChannel/socket when this kind of an error happens.
> > >> > >
> > >> > > To reproduce, one can just run this code but just put in any
> garbage
> > >> host
> > >> > > name, running lsof -p while running this will show that the open
> FDs
> > >> > > increases without limit.
> > >> > >
> > >> > > Thanks,
> > >> > > Rajiv
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
-- Guozhang

Reply via email to