I am using 0.8.1. The source is here: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Here is the definition of disconnect(): private def disconnect() = { if(blockingChannel.isConnected) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } } It checks if blockingChannel.isConnected before calling blockingChannel.disconnect(). I think if there is an UnresolvedAddressException, the isConnected is never set and the blockingChannel.disconnect() is never called. But by this point we have already created a socket and will leak it. The same problem might be present in the connect method of the BlockingChannel at https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala. Though its own disconnect method seems to check for both the connected: def disconnect() = lock synchronized { // My comment: connected will not be set if we get an UnresolvedAddressException but channel should NOT be null, so we will probably still do the right thing. if(connected || channel != null) { // closing the main socket channel *should* close the read channel // but let's do it to be sure. swallow(channel.close()) swallow(channel.socket.close()) if(readChannel != null) swallow(readChannel.close()) channel = null; readChannel = null; writeChannel = null connected = false } } On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Rajiv, > > Which version of Kafka are you using? I just checked SimpleConsumer's code, > and in its close() function, disconnect() is called, which will close the > socket. > > Guozhang > > > On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > Meant to write a run loop. > > > > void run() { > > while (running) { > > if (simpleConsumer == null) { > > simpleConsumer = new SimpleConsumer(host, port, > > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); > > } > > try { > > // Do stuff with simpleConsumer. > > } catch (Exception e) { > > logger.error(e); // Assume UnresolvedAddressException. > > if (consumer != null) { > > simpleConsumer.close(); > > simpleConsumer = null; > > } > > } > > } > > } > > > > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > Here is my typical flow: > > > void run() { > > > if (simpleConsumer == null) { > > > simpleConsumer = new SimpleConsumer(host, port, (int) > > kafkaSocketTimeout, > > > kafkaRExeiveBufferSize, clientName); > > > } > > > try { > > > // Do stuff with simpleConsumer. > > > } catch (Exception e) { > > > if (consumer != null) { > > > simpleConsumer.close(); > > > simpleConsumer = null; > > > } > > > } > > > } > > > > > > If there is a problem with the host name, or some DNS issues, we get an > > > UnresolvedAddressException as expected and attempt to close the > > > simpleConsumer. However this does not really get rid of the underlying > > > socket. So we end up leaking a FD every time this happens. Though this > is > > > not a common case I think there needs to be a way on the SimpleConsumer > > to > > > get rid of all OS resources that it is holding onto. Right now if this > > > keeps happening the number of FDs being consumed by the process keeps > > > increasing till we hit the OS limits. As a user I cannot do anything > else > > > but call simpleConsumer.close(). We need to be able to close the > > underlying > > > socketChannel/socket when this kind of an error happens. > > > > > > To reproduce, one can just run this code but just put in any garbage > host > > > name, running lsof -p while running this will show that the open FDs > > > increases without limit. > > > > > > Thanks, > > > Rajiv > > > > > > > > > > > > -- > -- Guozhang >