Here is my typical flow: void run() { if (simpleConsumer == null) { simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); } try { // Do stuff with simpleConsumer. } catch (Exception e) { if (consumer != null) { simpleConsumer.close(); simpleConsumer = null; } } }
If there is a problem with the host name, or some DNS issues, we get an UnresolvedAddressException as expected and attempt to close the simpleConsumer. However this does not really get rid of the underlying socket. So we end up leaking a FD every time this happens. Though this is not a common case I think there needs to be a way on the SimpleConsumer to get rid of all OS resources that it is holding onto. Right now if this keeps happening the number of FDs being consumed by the process keeps increasing till we hit the OS limits. As a user I cannot do anything else but call simpleConsumer.close(). We need to be able to close the underlying socketChannel/socket when this kind of an error happens. To reproduce, one can just run this code but just put in any garbage host name, running lsof -p while running this will show that the open FDs increases without limit. Thanks, Rajiv