Meant to write a run loop.

void run() {
  while (running) {
    if (simpleConsumer == null) {
      simpleConsumer = new SimpleConsumer(host, port,
(int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
    }
    try {
      // Do stuff with simpleConsumer.
    } catch (Exception e) {
      logger.error(e);  // Assume UnresolvedAddressException.
      if (consumer != null) {
        simpleConsumer.close();
        simpleConsumer = null;
      }
    }
  }
}

On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Here is my typical flow:
> void run() {
>   if (simpleConsumer == null) {
>     simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout,
> kafkaRExeiveBufferSize, clientName);
>   }
>   try {
>     // Do stuff with simpleConsumer.
>    } catch (Exception e) {
>      if (consumer != null) {
>        simpleConsumer.close();
>        simpleConsumer = null;
>      }
>   }
> }
>
> If there is a problem with the host name, or some DNS issues, we get an
> UnresolvedAddressException as expected and attempt to close the
> simpleConsumer. However this does not really get rid of the underlying
> socket. So we end up leaking a FD every time this happens. Though this is
> not a common case I think there needs to be a way on the SimpleConsumer to
> get rid of all OS resources that it is holding onto. Right now if this
> keeps happening the number of FDs being consumed by the process keeps
> increasing till we hit the OS limits. As a user I cannot do anything else
> but call simpleConsumer.close(). We need to be able to close the underlying
> socketChannel/socket when this kind of an error happens.
>
> To reproduce, one can just run this code but just put in any garbage host
> name, running lsof -p while running this will show that the open FDs
> increases without limit.
>
> Thanks,
> Rajiv
>
>

Reply via email to