Here is the relevant stack trace:

java.nio.channels.UnresolvedAddressException: null

        at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
~[na:1.7.0_55]

        at kafka.network.BlockingChannel.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.connect(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.metrics.KafkaTimer.time(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

        at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
~[kafka_2.10-0.8.0.jar:0.8.0]

On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> I am using 0.8.1. The source is here:
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
>
> Here is the definition of disconnect():
> private def disconnect() = {
>     if(blockingChannel.isConnected) {
>       debug("Disconnecting from " + host + ":" + port)
>       blockingChannel.disconnect()
>     }
>   }
> It checks if blockingChannel.isConnected before calling
> blockingChannel.disconnect(). I think if there is an
> UnresolvedAddressException, the isConnected is never set and the
> blockingChannel.disconnect() is never called. But by this point we have
> already created a socket and will leak it.
>
> The same problem might be present in the connect method of the
> BlockingChannel at
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala.
> Though its own disconnect method seems to check for both the connected:
>
> def disconnect() = lock synchronized {
>     // My comment: connected will not be set if we get an
> UnresolvedAddressException but channel should NOT  be null, so we will
> probably still do the right thing.
>     if(connected || channel != null) {
>       // closing the main socket channel *should* close the read channel
>       // but let's do it to be sure.
>       swallow(channel.close())
>       swallow(channel.socket.close())
>       if(readChannel != null) swallow(readChannel.close())
>       channel = null; readChannel = null; writeChannel = null
>       connected = false
>     }
>   }
>
>
>
> On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Rajiv,
>>
>> Which version of Kafka are you using? I just checked SimpleConsumer's
>> code,
>> and in its close() function, disconnect() is called, which will close the
>> socket.
>>
>> Guozhang
>>
>>
>> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
>> wrote:
>>
>> > Meant to write a run loop.
>> >
>> > void run() {
>> >   while (running) {
>> >     if (simpleConsumer == null) {
>> >       simpleConsumer = new SimpleConsumer(host, port,
>> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
>> >     }
>> >     try {
>> >       // Do stuff with simpleConsumer.
>> >     } catch (Exception e) {
>> >       logger.error(e);  // Assume UnresolvedAddressException.
>> >       if (consumer != null) {
>> >         simpleConsumer.close();
>> >         simpleConsumer = null;
>> >       }
>> >     }
>> >   }
>> > }
>> >
>> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
>> > wrote:
>> >
>> > > Here is my typical flow:
>> > > void run() {
>> > >   if (simpleConsumer == null) {
>> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
>> > kafkaSocketTimeout,
>> > > kafkaRExeiveBufferSize, clientName);
>> > >   }
>> > >   try {
>> > >     // Do stuff with simpleConsumer.
>> > >    } catch (Exception e) {
>> > >      if (consumer != null) {
>> > >        simpleConsumer.close();
>> > >        simpleConsumer = null;
>> > >      }
>> > >   }
>> > > }
>> > >
>> > > If there is a problem with the host name, or some DNS issues, we get
>> an
>> > > UnresolvedAddressException as expected and attempt to close the
>> > > simpleConsumer. However this does not really get rid of the underlying
>> > > socket. So we end up leaking a FD every time this happens. Though
>> this is
>> > > not a common case I think there needs to be a way on the
>> SimpleConsumer
>> > to
>> > > get rid of all OS resources that it is holding onto. Right now if this
>> > > keeps happening the number of FDs being consumed by the process keeps
>> > > increasing till we hit the OS limits. As a user I cannot do anything
>> else
>> > > but call simpleConsumer.close(). We need to be able to close the
>> > underlying
>> > > socketChannel/socket when this kind of an error happens.
>> > >
>> > > To reproduce, one can just run this code but just put in any garbage
>> host
>> > > name, running lsof -p while running this will show that the open FDs
>> > > increases without limit.
>> > >
>> > > Thanks,
>> > > Rajiv
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to