Here is the relevant stack trace: java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) ~[na:1.7.0_55] at kafka.network.BlockingChannel.connect(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.metrics.KafkaTimer.time(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.metrics.KafkaTimer.time(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.SimpleConsumer.fetch(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com> wrote: > I am using 0.8.1. The source is here: > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > > Here is the definition of disconnect(): > private def disconnect() = { > if(blockingChannel.isConnected) { > debug("Disconnecting from " + host + ":" + port) > blockingChannel.disconnect() > } > } > It checks if blockingChannel.isConnected before calling > blockingChannel.disconnect(). I think if there is an > UnresolvedAddressException, the isConnected is never set and the > blockingChannel.disconnect() is never called. But by this point we have > already created a socket and will leak it. > > The same problem might be present in the connect method of the > BlockingChannel at > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala. > Though its own disconnect method seems to check for both the connected: > > def disconnect() = lock synchronized { > // My comment: connected will not be set if we get an > UnresolvedAddressException but channel should NOT be null, so we will > probably still do the right thing. > if(connected || channel != null) { > // closing the main socket channel *should* close the read channel > // but let's do it to be sure. > swallow(channel.close()) > swallow(channel.socket.close()) > if(readChannel != null) swallow(readChannel.close()) > channel = null; readChannel = null; writeChannel = null > connected = false > } > } > > > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Rajiv, >> >> Which version of Kafka are you using? I just checked SimpleConsumer's >> code, >> and in its close() function, disconnect() is called, which will close the >> socket. >> >> Guozhang >> >> >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com> >> wrote: >> >> > Meant to write a run loop. >> > >> > void run() { >> > while (running) { >> > if (simpleConsumer == null) { >> > simpleConsumer = new SimpleConsumer(host, port, >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); >> > } >> > try { >> > // Do stuff with simpleConsumer. >> > } catch (Exception e) { >> > logger.error(e); // Assume UnresolvedAddressException. >> > if (consumer != null) { >> > simpleConsumer.close(); >> > simpleConsumer = null; >> > } >> > } >> > } >> > } >> > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com> >> > wrote: >> > >> > > Here is my typical flow: >> > > void run() { >> > > if (simpleConsumer == null) { >> > > simpleConsumer = new SimpleConsumer(host, port, (int) >> > kafkaSocketTimeout, >> > > kafkaRExeiveBufferSize, clientName); >> > > } >> > > try { >> > > // Do stuff with simpleConsumer. >> > > } catch (Exception e) { >> > > if (consumer != null) { >> > > simpleConsumer.close(); >> > > simpleConsumer = null; >> > > } >> > > } >> > > } >> > > >> > > If there is a problem with the host name, or some DNS issues, we get >> an >> > > UnresolvedAddressException as expected and attempt to close the >> > > simpleConsumer. However this does not really get rid of the underlying >> > > socket. So we end up leaking a FD every time this happens. Though >> this is >> > > not a common case I think there needs to be a way on the >> SimpleConsumer >> > to >> > > get rid of all OS resources that it is holding onto. Right now if this >> > > keeps happening the number of FDs being consumed by the process keeps >> > > increasing till we hit the OS limits. As a user I cannot do anything >> else >> > > but call simpleConsumer.close(). We need to be able to close the >> > underlying >> > > socketChannel/socket when this kind of an error happens. >> > > >> > > To reproduce, one can just run this code but just put in any garbage >> host >> > > name, running lsof -p while running this will show that the open FDs >> > > increases without limit. >> > > >> > > Thanks, >> > > Rajiv >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >