[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14199817#comment-14199817 ]
Vishal commented on KAFKA-1745: ------------------------------- [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static Queue<Producer<String, String>> producerPool = new ConcurrentLinkedQueue<Producer<String,String>>(); private static ProducerConfig config; static { Properties props = new Properties(); props.put("metadata.broker.list", "IP:Port"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { Producer<String, String> producer = producerPool.poll(); if(producer == null) { producer = new Producer<String, String>(config); } KeyedMessage<String, String> data = new KeyedMessage<String, String>("SaSMQ", "0", "test"); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i<100; i++) { tpe.submit(run); } Thread.sleep(10000); //10 seconds.... So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} > Each new thread creates a PIPE and KQUEUE as open files during > producer.send() and does not get cleared when the thread that creates them is > cleared. > ----------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-1745 > URL: https://issues.apache.org/jira/browse/KAFKA-1745 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.1.1 > Environment: Mac OS Mavericks > Reporter: Vishal > Priority: Critical > > Hi, > I'm using the java client API for Kafka. I wanted to send data to Kafka > by using a producer pool as I'm using a sync producer. The thread that sends > the data is from the thread pool that grows and shrinks depending on the > usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are > created (got this info by using lsof). If I keep using the same thread it's > fine but when a new thread sends data to Kafka (using producer.send() ) a new > KQUEUE and 2 PIPEs are created. > This is okay, but when the thread is cleared from the thread pool and a new > thread is created, then new KQUEUEs and PIPEs are created. The problem is > that the old ones which were created are not getting destroyed and they are > showing up as open files. This is causing a major problem as the number of > open file keep increasing and does not decrease. > Please suggest any solutions. > FYI, the number of TCP connections established from the producer system to > the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)