I am using 0.8.1. The source is here:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala

Here is the definition of disconnect():
private def disconnect() = {
    if(blockingChannel.isConnected) {
      debug("Disconnecting from " + host + ":" + port)
      blockingChannel.disconnect()
    }
  }
It checks if blockingChannel.isConnected before calling
blockingChannel.disconnect(). I think if there is an
UnresolvedAddressException, the isConnected is never set and the
blockingChannel.disconnect() is never called. But by this point we have
already created a socket and will leak it.

The same problem might be present in the connect method of the
BlockingChannel at
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
Though its own disconnect method seems to check for both the connected:

def disconnect() = lock synchronized {
    // My comment: connected will not be set if we get an
UnresolvedAddressException but channel should NOT  be null, so we will
probably still do the right thing.
    if(connected || channel != null) {
      // closing the main socket channel *should* close the read channel
      // but let's do it to be sure.
      swallow(channel.close())
      swallow(channel.socket.close())
      if(readChannel != null) swallow(readChannel.close())
      channel = null; readChannel = null; writeChannel = null
      connected = false
    }
  }



On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Rajiv,
>
> Which version of Kafka are you using? I just checked SimpleConsumer's code,
> and in its close() function, disconnect() is called, which will close the
> socket.
>
> Guozhang
>
>
> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Meant to write a run loop.
> >
> > void run() {
> >   while (running) {
> >     if (simpleConsumer == null) {
> >       simpleConsumer = new SimpleConsumer(host, port,
> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> >     }
> >     try {
> >       // Do stuff with simpleConsumer.
> >     } catch (Exception e) {
> >       logger.error(e);  // Assume UnresolvedAddressException.
> >       if (consumer != null) {
> >         simpleConsumer.close();
> >         simpleConsumer = null;
> >       }
> >     }
> >   }
> > }
> >
> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Here is my typical flow:
> > > void run() {
> > >   if (simpleConsumer == null) {
> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> > kafkaSocketTimeout,
> > > kafkaRExeiveBufferSize, clientName);
> > >   }
> > >   try {
> > >     // Do stuff with simpleConsumer.
> > >    } catch (Exception e) {
> > >      if (consumer != null) {
> > >        simpleConsumer.close();
> > >        simpleConsumer = null;
> > >      }
> > >   }
> > > }
> > >
> > > If there is a problem with the host name, or some DNS issues, we get an
> > > UnresolvedAddressException as expected and attempt to close the
> > > simpleConsumer. However this does not really get rid of the underlying
> > > socket. So we end up leaking a FD every time this happens. Though this
> is
> > > not a common case I think there needs to be a way on the SimpleConsumer
> > to
> > > get rid of all OS resources that it is holding onto. Right now if this
> > > keeps happening the number of FDs being consumed by the process keeps
> > > increasing till we hit the OS limits. As a user I cannot do anything
> else
> > > but call simpleConsumer.close(). We need to be able to close the
> > underlying
> > > socketChannel/socket when this kind of an error happens.
> > >
> > > To reproduce, one can just run this code but just put in any garbage
> host
> > > name, running lsof -p while running this will show that the open FDs
> > > increases without limit.
> > >
> > > Thanks,
> > > Rajiv
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to