Here is my typical flow:
void run() {
  if (simpleConsumer == null) {
    simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout,
kafkaRExeiveBufferSize, clientName);
  }
  try {
    // Do stuff with simpleConsumer.
   } catch (Exception e) {
     if (consumer != null) {
       simpleConsumer.close();
       simpleConsumer = null;
     }
  }
}

If there is a problem with the host name, or some DNS issues, we get an
UnresolvedAddressException as expected and attempt to close the
simpleConsumer. However this does not really get rid of the underlying
socket. So we end up leaking a FD every time this happens. Though this is
not a common case I think there needs to be a way on the SimpleConsumer to
get rid of all OS resources that it is holding onto. Right now if this
keeps happening the number of FDs being consumed by the process keeps
increasing till we hit the OS limits. As a user I cannot do anything else
but call simpleConsumer.close(). We need to be able to close the underlying
socketChannel/socket when this kind of an error happens.

To reproduce, one can just run this code but just put in any garbage host
name, running lsof -p while running this will show that the open FDs
increases without limit.

Thanks,
Rajiv

Reply via email to