Ewen, you are right, the patch is committed on Feb.20th last year, I will leave a comment and close that ticket.
On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that > will only be included in 0.8.2. > > Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug > is still open and there's a comment that moved it to 0.9 after the commit > was already made. Was the commit a mistake or did we just forget to close > it? > > On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian <ra...@signalfuse.com> > wrote: > > > Here is the relevant stack trace: > > > > java.nio.channels.UnresolvedAddressException: null > > > > at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] > > > > at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) > > ~[na:1.7.0_55] > > > > at kafka.network.BlockingChannel.connect(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.connect(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.metrics.KafkaTimer.time(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown > Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown > > Source) ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.metrics.KafkaTimer.time(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.consumer.SimpleConsumer.fetch(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source) > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > I am using 0.8.1. The source is here: > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala > > > > > > Here is the definition of disconnect(): > > > private def disconnect() = { > > > if(blockingChannel.isConnected) { > > > debug("Disconnecting from " + host + ":" + port) > > > blockingChannel.disconnect() > > > } > > > } > > > It checks if blockingChannel.isConnected before calling > > > blockingChannel.disconnect(). I think if there is an > > > UnresolvedAddressException, the isConnected is never set and the > > > blockingChannel.disconnect() is never called. But by this point we have > > > already created a socket and will leak it. > > > > > > The same problem might be present in the connect method of the > > > BlockingChannel at > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala > > . > > > Though its own disconnect method seems to check for both the connected: > > > > > > def disconnect() = lock synchronized { > > > // My comment: connected will not be set if we get an > > > UnresolvedAddressException but channel should NOT be null, so we will > > > probably still do the right thing. > > > if(connected || channel != null) { > > > // closing the main socket channel *should* close the read > channel > > > // but let's do it to be sure. > > > swallow(channel.close()) > > > swallow(channel.socket.close()) > > > if(readChannel != null) swallow(readChannel.close()) > > > channel = null; readChannel = null; writeChannel = null > > > connected = false > > > } > > > } > > > > > > > > > > > > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Rajiv, > > >> > > >> Which version of Kafka are you using? I just checked SimpleConsumer's > > >> code, > > >> and in its close() function, disconnect() is called, which will close > > the > > >> socket. > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com> > > >> wrote: > > >> > > >> > Meant to write a run loop. > > >> > > > >> > void run() { > > >> > while (running) { > > >> > if (simpleConsumer == null) { > > >> > simpleConsumer = new SimpleConsumer(host, port, > > >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); > > >> > } > > >> > try { > > >> > // Do stuff with simpleConsumer. > > >> > } catch (Exception e) { > > >> > logger.error(e); // Assume UnresolvedAddressException. > > >> > if (consumer != null) { > > >> > simpleConsumer.close(); > > >> > simpleConsumer = null; > > >> > } > > >> > } > > >> > } > > >> > } > > >> > > > >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com > > > > >> > wrote: > > >> > > > >> > > Here is my typical flow: > > >> > > void run() { > > >> > > if (simpleConsumer == null) { > > >> > > simpleConsumer = new SimpleConsumer(host, port, (int) > > >> > kafkaSocketTimeout, > > >> > > kafkaRExeiveBufferSize, clientName); > > >> > > } > > >> > > try { > > >> > > // Do stuff with simpleConsumer. > > >> > > } catch (Exception e) { > > >> > > if (consumer != null) { > > >> > > simpleConsumer.close(); > > >> > > simpleConsumer = null; > > >> > > } > > >> > > } > > >> > > } > > >> > > > > >> > > If there is a problem with the host name, or some DNS issues, we > get > > >> an > > >> > > UnresolvedAddressException as expected and attempt to close the > > >> > > simpleConsumer. However this does not really get rid of the > > underlying > > >> > > socket. So we end up leaking a FD every time this happens. Though > > >> this is > > >> > > not a common case I think there needs to be a way on the > > >> SimpleConsumer > > >> > to > > >> > > get rid of all OS resources that it is holding onto. Right now if > > this > > >> > > keeps happening the number of FDs being consumed by the process > > keeps > > >> > > increasing till we hit the OS limits. As a user I cannot do > anything > > >> else > > >> > > but call simpleConsumer.close(). We need to be able to close the > > >> > underlying > > >> > > socketChannel/socket when this kind of an error happens. > > >> > > > > >> > > To reproduce, one can just run this code but just put in any > garbage > > >> host > > >> > > name, running lsof -p while running this will show that the open > FDs > > >> > > increases without limit. > > >> > > > > >> > > Thanks, > > >> > > Rajiv > > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > Thanks, > Ewen > -- -- Guozhang