Hey,

I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
of 0.8.2.0 version for producing async message with callBackHandler.

I am using batch.size =1 in my producer cleint. As per my Understanding ,
This should behave like a sync client though message can be published from
different thread. Problem is my callBackHandler is not being called after
message is sent successfully to the server..

Below is my code of KafkaProducer

public class KafkaProducer
{
    private static Logger LOG =
LoggerFactory.getLogger(KafkaProducer.class);



    private Properties kafkaProducerProp = new Properties();
    private org.apache.kafka.clients.producer.KafkaProducer<String, String>
myProducer;

    public void initializeProperty()
    {
        kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
        kafkaProducerProp.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProp.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        kafkaProducerProp.put("client.id", "ankitLocalTest");
        kafkaProducerProp.put("batch.size", 1);
    }

    public void initialize()
    {
        myProducer = new
org.apache.kafka.clients.producer.KafkaProducer<String,
String>(kafkaProducerProp);
    }

    public void sendMessage(String myEventKey, String myEventValue)
    {

        ProducerRecord<String, String> myProducerRecord = new
ProducerRecord<String, String>("testTopic2", myEventKey,
            myEventValue);
        myProducer.send(myProducerRecord, new ProducerCallBackHandler());
    }

    public void sendBlockingMessage(String myEventKey, String myEventValue)
    {

        ProducerRecord<String, String> myProducerRecord = new
ProducerRecord<String, String>("testTopic", myEventKey,
            myEventValue);
        try {
            RecordMetadata
recordMetaData=myProducer.send(myProducerRecord).get();
            LOG.info("offset :{} and partition : {} of last published
message in blocking mode
",recordMetaData.offset(),recordMetaData.partition());
        }
        catch (Exception e) {
            LOG.info("Exception : {} occured while publishing
event",e.getCause());
        }

    }

}




public class ProducerCallBackHandler implements Callback
{

    Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);

    @Override
    public void onCompletion(RecordMetadata recordMetaData, Exception
exceptionOccured)
    {
       if(recordMetaData == null){
           LOG.info("error Occured while publishing event on Kafka");
           return;
       }
       if(exceptionOccured != null){
           LOG.info("Exception :{} Occured while publishing event on Kafka"
+exceptionOccured.getCause());
           return;
       }

       LOG.info("Message published on kafka with offset : {} and  partition
: {}",recordMetaData.offset(),recordMetaData.partition());
    }

}



I wrote a junit case to publish around 1000 message on the topic.

public class KafkaProducerTest
{

    @Test
    @Ignore
    public void ProducerBlockingTest()
    {
        KafkaProducer producer = new KafkaProducer();
        producer.initializeProperty();
        producer.initialize();
        for (int i = 0; i < 1000; i++) {
            producer.sendBlockingMessage("firstMessageKey",
"firstMessageValue");
        }
    }

    @Test
    public void ProducerNonBlockingTest()
    {
        KafkaProducer producer = new KafkaProducer();
        producer.initializeProperty();
        producer.initialize();
        for (int i = 0; i < 1000; i++) {
            producer.sendMessage("firstMessageKey", "firstMessageValue");
        }

    }

}


when i saw my Logs, i shows call back handler was called till 48 messages,
not after that

[2015-02-17 01:30:26.068][kafka-producer-network-thread |
ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
kafka with offset : 46 and  partition : 0
[2015-02-17 01:30:26.071][kafka-producer-network-thread |
ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
kafka with offset : 47 and  partition : 0
[2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
kafka with offset : 48 and  partition : 0*


*Am I doing something wrong??*

Reply via email to