Rajiv,

Which version of Kafka are you using? I just checked SimpleConsumer's code,
and in its close() function, disconnect() is called, which will close the
socket.

Guozhang


On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Meant to write a run loop.
>
> void run() {
>   while (running) {
>     if (simpleConsumer == null) {
>       simpleConsumer = new SimpleConsumer(host, port,
> (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
>     }
>     try {
>       // Do stuff with simpleConsumer.
>     } catch (Exception e) {
>       logger.error(e);  // Assume UnresolvedAddressException.
>       if (consumer != null) {
>         simpleConsumer.close();
>         simpleConsumer = null;
>       }
>     }
>   }
> }
>
> On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Here is my typical flow:
> > void run() {
> >   if (simpleConsumer == null) {
> >     simpleConsumer = new SimpleConsumer(host, port, (int)
> kafkaSocketTimeout,
> > kafkaRExeiveBufferSize, clientName);
> >   }
> >   try {
> >     // Do stuff with simpleConsumer.
> >    } catch (Exception e) {
> >      if (consumer != null) {
> >        simpleConsumer.close();
> >        simpleConsumer = null;
> >      }
> >   }
> > }
> >
> > If there is a problem with the host name, or some DNS issues, we get an
> > UnresolvedAddressException as expected and attempt to close the
> > simpleConsumer. However this does not really get rid of the underlying
> > socket. So we end up leaking a FD every time this happens. Though this is
> > not a common case I think there needs to be a way on the SimpleConsumer
> to
> > get rid of all OS resources that it is holding onto. Right now if this
> > keeps happening the number of FDs being consumed by the process keeps
> > increasing till we hit the OS limits. As a user I cannot do anything else
> > but call simpleConsumer.close(). We need to be able to close the
> underlying
> > socketChannel/socket when this kind of an error happens.
> >
> > To reproduce, one can just run this code but just put in any garbage host
> > name, running lsof -p while running this will show that the open FDs
> > increases without limit.
> >
> > Thanks,
> > Rajiv
> >
> >
>



-- 
-- Guozhang

Reply via email to