Meant to write a run loop. void run() { while (running) { if (simpleConsumer == null) { simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); } try { // Do stuff with simpleConsumer. } catch (Exception e) { logger.error(e); // Assume UnresolvedAddressException. if (consumer != null) { simpleConsumer.close(); simpleConsumer = null; } } } }
On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Here is my typical flow: > void run() { > if (simpleConsumer == null) { > simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout, > kafkaRExeiveBufferSize, clientName); > } > try { > // Do stuff with simpleConsumer. > } catch (Exception e) { > if (consumer != null) { > simpleConsumer.close(); > simpleConsumer = null; > } > } > } > > If there is a problem with the host name, or some DNS issues, we get an > UnresolvedAddressException as expected and attempt to close the > simpleConsumer. However this does not really get rid of the underlying > socket. So we end up leaking a FD every time this happens. Though this is > not a common case I think there needs to be a way on the SimpleConsumer to > get rid of all OS resources that it is holding onto. Right now if this > keeps happening the number of FDs being consumed by the process keeps > increasing till we hit the OS limits. As a user I cannot do anything else > but call simpleConsumer.close(). We need to be able to close the underlying > socketChannel/socket when this kind of an error happens. > > To reproduce, one can just run this code but just put in any garbage host > name, running lsof -p while running this will show that the open FDs > increases without limit. > > Thanks, > Rajiv > >