attaching my producer whole code.
Creating kafkaProducer Bean in xml * <bean id="kafkaESBProducer" class="com.snapdeal.coms.kafka.KafkaProducer">* * <constructor-arg name="topic" * * value="${KAFKA_ESB_TOPIC_NAME:kafka_topic_coms_esb_${COMS_PROFILE:dev}_${USER}}" />* * <constructor-arg name="producerProperties">* * <props>* * <prop key="metadata.broker.list">${KAFKA_PRODUCER_BROKER_LIST}</prop>* * </props>* * </constructor-arg>* * </bean>* public class KafkaProducer { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); private static final String SYSTEM_USER_NAME_PROPERTY = "user.name"; private static final String CONFIG_PARAM_CLIENT_ID = "client.id"; private static final String CLIENT_ID_FORMAT_STR = "kafka.coms.producer.%s.%s.%s"; private enum ConfigParam { SERIALIZER_CLASS("serializer.class", CommonPropertyParam.KAFKA_PRODUCER_SERIALIZER_CLASS), KEY_SERIALIZER_CLASS("key.serializer.class", CommonPropertyParam.KAFKA_PRODUCER_PARTITION_KEY_SERIALIZER_CLASS), //commenting this to use kafka default paritioner // PARTITIONER_CLASS("partitioner.class", // CommonPropertyParam.KAFKA_PRODUCER_PARTITIONER_CLASS), REQUEST_REQUIRED_ACKS("request.required.acks", CommonPropertyParam.KAFKA_PRODUCER_REQUEST_REQUIRED_ACKS); private final String myName; private final PropertyParam myParam; ConfigParam(String name, PropertyParam param) { myName = name; myParam = param; } public String getName() { return myName; } public PropertyParam getParam() { return myParam; } } private final String myTopic; private final Properties myProducerProperties; private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer; @Autowired private COMSConfiguration myAppConfig; public KafkaProducer(String topic, Properties producerProperties) { LOG.info("Creating Kafka Producer instance: {}", this); myTopic = topic; myProducerProperties = producerProperties; } * @PostConstruct* * private void initializeProducer()* * {* * LOG.info("Initializing Kafka Producer for topic: {}", getTopic());* * // Set producer unique client id* * String currentUser = System.getProperty(SYSTEM_USER_NAME_PROPERTY);* * String currentJVMName = ManagementFactory.getRuntimeMXBean().getName();* * currentJVMName = currentJVMName.replace('@', '_');* * String uniqueClientId = String.format(CLIENT_ID_FORMAT_STR,* * getTopic(), currentUser, currentJVMName);* * if (myProducerProperties.contains(CONFIG_PARAM_CLIENT_ID)) {* * uniqueClientId += ":"* * + myProducerProperties.getProperty(CONFIG_PARAM_CLIENT_ID);* * }* * myProducerProperties.setProperty(CONFIG_PARAM_CLIENT_ID,* * uniqueClientId);* * // Set reasonable defaults for required params* * for (ConfigParam cp : ConfigParam.values()) {* * if (!myProducerProperties.containsKey(cp.getName())) {* * String cpValue = myAppConfig.getPropertyValue(cp.getParam());* * myProducerProperties.setProperty(cp.getName(), cpValue);* * }* * }* * myProducer =* * new Producer<>(new ProducerConfig(myProducerProperties));* * LOG.info("Initialized Kafka Producer for topic: {} and properties {}", getTopic(),myProducerProperties);* * }* public String getTopic() { return myTopic; } public Producer<KafkaPartitionKey, KafkaEventWrapper> getProducer() { return myProducer; } public void send(KeyedMessage<KafkaPartitionKey, KafkaEventWrapper> msg) { myProducer.send(msg); } public void send( List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>> msgs) { myProducer.send(msgs); } * @PreDestroy* * public void stop()* * {* * LOG.info("Stopping Kafka Producer for topic: {}", myTopic);* * if (myProducer != null) {* * myProducer.close();* * }* * }* } On Fri, Jan 30, 2015 at 1:08 PM, ankit tyagi <ankittyagi.mn...@gmail.com> wrote: > I have shared object histogram after and before gc on gist > https://gist.github.com/ankit1987/f4a04a1350fdd609096d > > On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <jai.forums2...@gmail.com> > wrote: > >> What kind of a (managed) component is that which has the @PreDestroy? >> Looking at the previous snippet you added, it looks like you are creating >> the Producer in some method? If you are going to close the producer in a >> @PreDestroy of the component, then you should be creating the producer in >> the @PostConstruct of the same component, so that you have proper lifecycle >> management of those resources. >> >> >> -Jaikiran >> >> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote: >> >>> Hi, >>> >>> I am closing my producer at the time of shutting down my application. >>> >>> @PreDestroy >>> public void stop() >>> { >>> LOG.info("Stopping Kafka Producer for topic: {}", myTopic); >>> if (myProducer != null) { >>> myProducer.close(); >>> } >>> } >>> >>> >>> >>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <ku...@nmsworks.co.in> >>> wrote: >>> >>> Hope you are closing the producers. can you share the attachment through >>>> gist/patebin >>>> >>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi < >>>> ankittyagi.mn...@gmail.com> >>>> wrote: >>>> >>>> Hi Jaikiran, >>>>> >>>>> I am using ubuntu and was able to reproduce on redhat too. Please find >>>>> >>>> the >>>> >>>>> more information below. >>>>> >>>>> >>>>> *DISTRIB_ID=Ubuntu* >>>>> *DISTRIB_RELEASE=12.04* >>>>> *DISTRIB_CODENAME=precise* >>>>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"* >>>>> >>>>> *java version "1.7.0_72"* >>>>> >>>>> This is happening on client side. Output of lsof was showing that >>>>> maximum >>>>> fd were FIFO and anon. But after GC FD count was reduced significantly. >>>>> >>>>> Below is my Client Code which i am using for publishing message. >>>>> >>>>> >>>>> * private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;* >>>>> >>>>> * myProducer = new Producer<>(new >>>>> ProducerConfig(myProducerProperties));* >>>>> >>>>> * public void send(* >>>>> * List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>> >>>>> msgs)* >>>>> * {* >>>>> * myProducer.send(msgs);* >>>>> * }* >>>>> >>>>> >>>>> we are using sync producer. I am attaching object histo before >>>>> >>>> GC(histo_1) >>>> >>>>> and after GC(histo_2) in my application. >>>>> >>>>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai < >>>>> jai.forums2...@gmail.com> >>>>> wrote: >>>>> >>>>> Which operating system are you on and what Java version? Depending on >>>>>> >>>>> the >>>> >>>>> OS, you could get tools (like lsof) to show which file descriptors are >>>>>> being held on to. Is it the client JVM which ends up with these leaks? >>>>>> >>>>>> Also, would it be possible to post a snippet of your application code >>>>>> which shows how you are using the Kafka APIs? >>>>>> >>>>>> -Jaikiran >>>>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote: >>>>>> >>>>>> Hi, >>>>>>> >>>>>>> Currently we are using sync producer client of 0.8.1 version in our >>>>>>> production box . we are getting the following exception while >>>>>>> >>>>>> publishing >>>> >>>>> kafka message >>>>>>> >>>>>>> *[2015-01-29 >>>>>>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89] >>>>>>> >>>>>> Fetching >>>>> >>>>>> topic metadata with correlation id 10808 for topics [Set(* >>>>>>> *kafka_topic_coms_FD_test1)] from broker >>>>>>> >>>>>> [id:0,host:localhost,port:9092] >>>> >>>>> failed* >>>>>>> *java.net.ConnectException: Connection refused* >>>>>>> * at sun.nio.ch.Net.connect0(Native Method)* >>>>>>> * at sun.nio.ch.Net.connect(Net.java:465)* >>>>>>> * at sun.nio.ch.Net.connect(Net.java:457)* >>>>>>> * at >>>>>>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)* >>>>>>> at >>>>>>> >>>>>> kafka.network.BlockingChannel.connect(BlockingChannel.scala: >>>> >>>>> 57) >>>>>>> at >>>>>>> >>>>>> kafka.producer.SyncProducer.connect(SyncProducer.scala:141) >>>> >>>>> at >>>>>>> >>>>>>> kafka.producer.SyncProducer.getOrMakeConnection( >>>> SyncProducer.scala:156) >>>> >>>>> at >>>>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$ >>>>>>> doSend(SyncProducer.scala:68) >>>>>>> at kafka.producer.SyncProducer. >>>>>>> send(SyncProducer.scala:112) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) >>>>>>> at >>>>>>> kafka.producer.BrokerPartitionInfo.updateInfo( >>>>>>> BrokerPartitionInfo.scala:82) >>>>>>> >>>>>>> >>>>>>> we are using dynamic thread pool to publish message to kafka. My >>>>>>> observation is when after keep alive time when threads in my executor >>>>>>> >>>>>> gets >>>>> >>>>>> destroyed, somehow file descriptor is not getting cleared but when i >>>>>>> >>>>>> did >>>> >>>>> explicitly ran the full gc, fd count got reduced by a signification >>>>>>> >>>>>> amout. >>>>> >>>>>> >>>>>>> >> >