attaching my producer whole code.

Creating kafkaProducer Bean in xml

* <bean id="kafkaESBProducer"
class="com.snapdeal.coms.kafka.KafkaProducer">*
*        <constructor-arg name="topic" *
*
 
value="${KAFKA_ESB_TOPIC_NAME:kafka_topic_coms_esb_${COMS_PROFILE:dev}_${USER}}"
/>*
*        <constructor-arg name="producerProperties">*
*            <props>*
*                <prop
key="metadata.broker.list">${KAFKA_PRODUCER_BROKER_LIST}</prop>*
*            </props>*
*        </constructor-arg>*
*    </bean>*


public class KafkaProducer
{
    private static final Logger LOG =
        LoggerFactory.getLogger(KafkaProducer.class);
    private static final String SYSTEM_USER_NAME_PROPERTY = "user.name";
    private static final String CONFIG_PARAM_CLIENT_ID = "client.id";
    private static final String CLIENT_ID_FORMAT_STR =
        "kafka.coms.producer.%s.%s.%s";

    private enum ConfigParam
    {
        SERIALIZER_CLASS("serializer.class",
            CommonPropertyParam.KAFKA_PRODUCER_SERIALIZER_CLASS),
        KEY_SERIALIZER_CLASS("key.serializer.class",

CommonPropertyParam.KAFKA_PRODUCER_PARTITION_KEY_SERIALIZER_CLASS),
        //commenting this to use kafka default paritioner
//            PARTITIONER_CLASS("partitioner.class",
//            CommonPropertyParam.KAFKA_PRODUCER_PARTITIONER_CLASS),
        REQUEST_REQUIRED_ACKS("request.required.acks",
            CommonPropertyParam.KAFKA_PRODUCER_REQUEST_REQUIRED_ACKS);

        private final String myName;
        private final PropertyParam myParam;

        ConfigParam(String name, PropertyParam param)
        {
            myName = name;
            myParam = param;
        }

        public String getName()
        {
            return myName;
        }

        public PropertyParam getParam()
        {
            return myParam;
        }
    }

    private final String myTopic;
    private final Properties myProducerProperties;

    private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;

    @Autowired
    private COMSConfiguration myAppConfig;

    public KafkaProducer(String topic, Properties producerProperties)
    {
        LOG.info("Creating Kafka Producer instance: {}", this);

        myTopic = topic;
        myProducerProperties = producerProperties;
    }

*    @PostConstruct*
*    private void initializeProducer()*
*    {*
*        LOG.info("Initializing Kafka Producer for topic: {}", getTopic());*

*        // Set producer unique client id*
*        String currentUser =
System.getProperty(SYSTEM_USER_NAME_PROPERTY);*
*        String currentJVMName =
ManagementFactory.getRuntimeMXBean().getName();*
*        currentJVMName = currentJVMName.replace('@', '_');*

*        String uniqueClientId = String.format(CLIENT_ID_FORMAT_STR,*
*            getTopic(), currentUser, currentJVMName);*
*        if (myProducerProperties.contains(CONFIG_PARAM_CLIENT_ID)) {*
*            uniqueClientId += ":"*
*                +
myProducerProperties.getProperty(CONFIG_PARAM_CLIENT_ID);*
*        }*
*        myProducerProperties.setProperty(CONFIG_PARAM_CLIENT_ID,*
*            uniqueClientId);*

*        // Set reasonable defaults for required params*
*        for (ConfigParam cp : ConfigParam.values()) {*
*            if (!myProducerProperties.containsKey(cp.getName())) {*
*                String cpValue =
myAppConfig.getPropertyValue(cp.getParam());*
*                myProducerProperties.setProperty(cp.getName(), cpValue);*
*            }*
*        }*

*        myProducer =*
*            new Producer<>(new ProducerConfig(myProducerProperties));*

*        LOG.info("Initialized Kafka Producer for topic: {} and properties
{}", getTopic(),myProducerProperties);*
*    }*

    public String getTopic()
    {
        return myTopic;
    }

    public Producer<KafkaPartitionKey, KafkaEventWrapper> getProducer()
    {
        return myProducer;
    }

    public void send(KeyedMessage<KafkaPartitionKey, KafkaEventWrapper> msg)
    {
        myProducer.send(msg);
    }

    public void send(
        List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>> msgs)
    {
        myProducer.send(msgs);
    }

*    @PreDestroy*
*    public void stop()*
*    {*
*        LOG.info("Stopping Kafka Producer for topic: {}", myTopic);*
*        if (myProducer != null) {*
*            myProducer.close();*
*        }*
*    }*
}


On Fri, Jan 30, 2015 at 1:08 PM, ankit tyagi <ankittyagi.mn...@gmail.com>
wrote:

> I have shared object histogram after and before gc on gist
> https://gist.github.com/ankit1987/f4a04a1350fdd609096d
>
> On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <jai.forums2...@gmail.com>
> wrote:
>
>> What kind of a (managed) component is that which has the @PreDestroy?
>> Looking at the previous snippet you added, it looks like you are creating
>> the Producer in some method? If  you are going to close the producer in a
>> @PreDestroy of the component, then you should be creating the producer in
>> the @PostConstruct of the same component, so that you have proper lifecycle
>> management of those resources.
>>
>>
>> -Jaikiran
>>
>> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>>
>>> Hi,
>>>
>>> I am closing my producer at the time of shutting down my application.
>>>
>>> @PreDestroy
>>>      public void stop()
>>>      {
>>>          LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>>          if (myProducer != null) {
>>>              myProducer.close();
>>>          }
>>>      }
>>>
>>>
>>>
>>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <ku...@nmsworks.co.in>
>>> wrote:
>>>
>>>  Hope you are closing the producers. can you share the attachment through
>>>> gist/patebin
>>>>
>>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
>>>> ankittyagi.mn...@gmail.com>
>>>> wrote:
>>>>
>>>>  Hi Jaikiran,
>>>>>
>>>>> I am using ubuntu and was able to reproduce on redhat too. Please find
>>>>>
>>>> the
>>>>
>>>>> more information below.
>>>>>
>>>>>
>>>>> *DISTRIB_ID=Ubuntu*
>>>>> *DISTRIB_RELEASE=12.04*
>>>>> *DISTRIB_CODENAME=precise*
>>>>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>>>>>
>>>>> *java version "1.7.0_72"*
>>>>>
>>>>> This is happening on client side. Output of lsof was showing that
>>>>> maximum
>>>>> fd were FIFO and anon. But after GC FD count was reduced significantly.
>>>>>
>>>>> Below is my Client Code which i am using for publishing message.
>>>>>
>>>>>
>>>>> * private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;*
>>>>>
>>>>> * myProducer =            new Producer<>(new
>>>>> ProducerConfig(myProducerProperties));*
>>>>>
>>>>> *   public void send(*
>>>>> *        List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>>
>>>>> msgs)*
>>>>> *    {*
>>>>> *        myProducer.send(msgs);*
>>>>> *    }*
>>>>>
>>>>>
>>>>> we are using sync producer. I am attaching object histo before
>>>>>
>>>> GC(histo_1)
>>>>
>>>>> and after GC(histo_2) in my application.
>>>>>
>>>>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <
>>>>> jai.forums2...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>  Which operating system are you on and what Java version? Depending on
>>>>>>
>>>>> the
>>>>
>>>>> OS, you could get tools (like lsof) to show which file descriptors are
>>>>>> being held on to. Is it the client JVM which ends up with these leaks?
>>>>>>
>>>>>> Also, would it be possible to post a snippet of your application code
>>>>>> which shows how you are using the Kafka APIs?
>>>>>>
>>>>>> -Jaikiran
>>>>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>>>>>>
>>>>>>  Hi,
>>>>>>>
>>>>>>> Currently we are using sync producer client of 0.8.1 version in our
>>>>>>> production box . we are getting the following exception while
>>>>>>>
>>>>>> publishing
>>>>
>>>>> kafka message
>>>>>>>
>>>>>>> *[2015-01-29
>>>>>>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
>>>>>>>
>>>>>> Fetching
>>>>>
>>>>>> topic metadata with correlation id 10808 for topics [Set(*
>>>>>>> *kafka_topic_coms_FD_test1)] from broker
>>>>>>>
>>>>>> [id:0,host:localhost,port:9092]
>>>>
>>>>> failed*
>>>>>>> *java.net.ConnectException: Connection refused*
>>>>>>> *        at sun.nio.ch.Net.connect0(Native Method)*
>>>>>>> *        at sun.nio.ch.Net.connect(Net.java:465)*
>>>>>>> *        at sun.nio.ch.Net.connect(Net.java:457)*
>>>>>>> *        at
>>>>>>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>>>>>>           at
>>>>>>>
>>>>>> kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>>>>
>>>>> 57)
>>>>>>>           at
>>>>>>>
>>>>>> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>>>
>>>>>           at
>>>>>>>
>>>>>>>  kafka.producer.SyncProducer.getOrMakeConnection(
>>>> SyncProducer.scala:156)
>>>>
>>>>>           at
>>>>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>>>>>>> doSend(SyncProducer.scala:68)
>>>>>>>           at kafka.producer.SyncProducer.
>>>>>>> send(SyncProducer.scala:112)
>>>>>>>           at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>>>>>>           at
>>>>>>> kafka.producer.BrokerPartitionInfo.updateInfo(
>>>>>>> BrokerPartitionInfo.scala:82)
>>>>>>>
>>>>>>>
>>>>>>> we are using dynamic thread pool to publish message to kafka. My
>>>>>>> observation is when after keep alive time when threads in my executor
>>>>>>>
>>>>>> gets
>>>>>
>>>>>> destroyed, somehow file descriptor is not getting cleared but when i
>>>>>>>
>>>>>> did
>>>>
>>>>> explicitly ran the full gc, fd count got reduced by a signification
>>>>>>>
>>>>>> amout.
>>>>>
>>>>>>
>>>>>>>
>>
>

Reply via email to