[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14202291#comment-14202291 ]
Ewen Cheslack-Postava commented on KAFKA-1745: ---------------------------------------------- [~Vishal M] I looked into this a bit more. I was testing the new producer which is why I was seeing different behaviors. With your test program I can reproduce the issue. I tried to track down what was triggering the new KQUEUEs and PIPEs -- I thought this would only happen when new sockets were created, but logging the creation (and connection requests) for BlockingChannel doesn't indicate we're calling those after the initial setup. I tracked down the system calls with dtrace, but stacktraces in dtrace are broken on OSX. Instead I had dtrace pause the process and then took a stacktrace with jstack -F. This looks like the offending thread: {quote} Thread 25099: (state = IN_NATIVE) - sun.nio.ch.KQueueArrayWrapper.init() @bci=0 (Interpreted frame) - sun.nio.ch.KQueueArrayWrapper.<init>() @bci=59, line=100 (Interpreted frame) - sun.nio.ch.KQueueSelectorImpl.<init>(java.nio.channels.spi.SelectorProvider) @bci=51, line=87 (Interpreted frame) - sun.nio.ch.KQueueSelectorProvider.openSelector() @bci=5, line=42 (Interpreted frame) - sun.nio.ch.Util.getTemporarySelector(java.nio.channels.SelectableChannel) @bci=54, line=264 (Interpreted frame) - sun.nio.ch.SocketAdaptor$SocketInputStream.read(java.nio.ByteBuffer) @bci=197, line=214 (Interpreted frame) - sun.nio.ch.ChannelInputStream.read(byte[], int, int) @bci=101, line=103 (Interpreted frame) - java.nio.channels.Channels$ReadableByteChannelImpl.read(java.nio.ByteBuffer) @bci=84, line=385 (Interpreted frame) - kafka.utils.Utils$.read(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) @bci=2, line=380 (Interpreted frame) - kafka.network.BoundedByteBufferReceive.readFrom(java.nio.channels.ReadableByteChannel) @bci=26, line=54 (Interpreted frame) - kafka.network.Receive$class.readCompletely(kafka.network.Receive, java.nio.channels.ReadableByteChannel) @bci=15, line=56 (Interpreted frame) - kafka.network.BoundedByteBufferReceive.readCompletely(java.nio.channels.ReadableByteChannel) @bci=2, line=29 (Interpreted frame) - kafka.network.BlockingChannel.receive() @bci=20, line=111 (Interpreted frame) - kafka.producer.SyncProducer.liftedTree1$1(kafka.api.RequestOrResponse, boolean, scala.runtime.ObjectRef) @bci=18, line=76 (Interpreted frame) - kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(kafka.api.RequestOrResponse, boolean) @bci=33, line=73 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp() @bci=40, line=104 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply() @bci=1, line=104 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply() @bci=1, line=104 (Interpreted frame) - kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp() @bci=12, line=103 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103 (Interpreted frame) - kafka.producer.SyncProducer$$anonfun$send$1.apply() @bci=1, line=103 (Interpreted frame) - kafka.metrics.KafkaTimer.time(scala.Function0) @bci=9, line=33 (Interpreted frame) - kafka.producer.SyncProducer.send(kafka.api.ProducerRequest) @bci=90, line=102 (Interpreted frame) - kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(int, scala.collection.mutable.Map) @bci=134, line=256 (Interpreted frame) - kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(scala.Tuple2) @bci=98, line=107 (Interpreted frame) - kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(java.lang.Object) @bci=5, line=99 (Interpreted frame) - scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(java.lang.Object) @bci=24, line=772 (Interpreted frame) - scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(scala.collection.mutable.DefaultEntry) @bci=19, line=98 (Interpreted frame) - scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(java.lang.Object) @bci=5, line=98 (Interpreted frame) - scala.collection.mutable.HashTable$class.foreachEntry(scala.collection.mutable.HashTable, scala.Function1) @bci=26, line=226 (Interpreted frame) - scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2, line=39 (Interpreted frame) - scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=10, line=98 (Interpreted frame) - scala.collection.TraversableLike$WithFilter.foreach(scala.Function1) @bci=13, line=771 (Interpreted frame) - kafka.producer.async.DefaultEventHandler.dispatchSerializedData(scala.collection.Seq) @bci=65, line=99 (Interpreted frame) - kafka.producer.async.DefaultEventHandler.handle(scala.collection.Seq) @bci=214, line=72 (Interpreted frame) - kafka.producer.Producer.send(scala.collection.Seq) @bci=45, line=76 (Interpreted frame) - kafka.javaapi.producer.Producer.send(kafka.producer.KeyedMessage) @bci=21, line=33 (Interpreted frame) - kafka.examples.KQTestX$1.run() @bci=83, line=73 (Interpreted frame) - java.util.concurrent.Executors$RunnableAdapter.call() @bci=4, line=471 (Interpreted frame) - java.util.concurrent.FutureTask.run() @bci=42, line=262 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame) {quote} which shows the NIO is creating the new kqueue internally during a normal read (and KQueueSelectorImpl also creates a pipe). I did verify that this doesn't happen unless new threads are allocated, but I'm still not sure why using a new thread would trigger this or how to fix this since it's internal to NIO. > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > ----------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks > Reporter: Vishal > Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established from the producer system to > the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)