Hi Jey,

I have debugged further and looks like it was problem with my test case.
After calling send method for specific time my main thread was getting
finished and i was not waiting for *kafka-producer-network-thread* to send
all the messages from the buffer hence my test case was failing.


I have few question now
 i) Is there any parameter or flag to find out whether all messages from
buffer has been published? I want to attach my shutdownhook so that i could
make sure that all my messages has been published at the time of restart.

 ii) if I am using async producer with callback handler , will retry logic
in case of rebalance exception or any other exception handle by
kafka-producer-network-thread or that logic has be to implemented in
callback handler.







On Tue, Feb 17, 2015 at 4:42 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Sounds like a potential bug, and it sounds like you can easily reproduce
> it. Can you post your test code and a description of the server version and
> how you started/configured it, and what you expect to see from your test
> and what you actually see:
> https://issues.apache.org/jira/browse/KAFKA/
>
> This will help make the debugging a bit easier on our end and avoid
> confusion.
>
> As Steven mentions your description of batch.size isn't quite right. The
> batch size is the buffer in bytes used for batching messages together--that
> is the target size the client tries to batch together for a partition (it
> won't batch together more than that). Setting it to 1 won't effect the
> sync/async aspects of the send but it will ensure that each message is sent
> in a request with no other messages from that partition (no reason you
> would want that). So setting this to 1 is maybe weird but you should still
> see all the callbacks fire.
>
> -Jay
>
> On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <ankittyagi.mn...@gmail.com>
> wrote:
>
> > Hey,
> >
> > I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client
> > of 0.8.2.0 version for producing async message with callBackHandler.
> >
> > I am using batch.size =1 in my producer cleint. As per my Understanding ,
> > This should behave like a sync client though message can be published
> from
> > different thread. Problem is my callBackHandler is not being called after
> > message is sent successfully to the server..
> >
> > Below is my code of KafkaProducer
> >
> > public class KafkaProducer
> > {
> >     private static Logger LOG =
> > LoggerFactory.getLogger(KafkaProducer.class);
> >
> >
> >
> >     private Properties kafkaProducerProp = new Properties();
> >     private org.apache.kafka.clients.producer.KafkaProducer<String,
> String>
> > myProducer;
> >
> >     public void initializeProperty()
> >     {
> >         kafkaProducerProp.put("bootstrap.servers", "localhost:9092");
> >         kafkaProducerProp.put("value.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         kafkaProducerProp.put("key.serializer",
> > "org.apache.kafka.common.serialization.StringSerializer");
> >         kafkaProducerProp.put("client.id", "ankitLocalTest");
> >         kafkaProducerProp.put("batch.size", 1);
> >     }
> >
> >     public void initialize()
> >     {
> >         myProducer = new
> > org.apache.kafka.clients.producer.KafkaProducer<String,
> > String>(kafkaProducerProp);
> >     }
> >
> >     public void sendMessage(String myEventKey, String myEventValue)
> >     {
> >
> >         ProducerRecord<String, String> myProducerRecord = new
> > ProducerRecord<String, String>("testTopic2", myEventKey,
> >             myEventValue);
> >         myProducer.send(myProducerRecord, new ProducerCallBackHandler());
> >     }
> >
> >     public void sendBlockingMessage(String myEventKey, String
> myEventValue)
> >     {
> >
> >         ProducerRecord<String, String> myProducerRecord = new
> > ProducerRecord<String, String>("testTopic", myEventKey,
> >             myEventValue);
> >         try {
> >             RecordMetadata
> > recordMetaData=myProducer.send(myProducerRecord).get();
> >             LOG.info("offset :{} and partition : {} of last published
> > message in blocking mode
> > ",recordMetaData.offset(),recordMetaData.partition());
> >         }
> >         catch (Exception e) {
> >             LOG.info("Exception : {} occured while publishing
> > event",e.getCause());
> >         }
> >
> >     }
> >
> > }
> >
> >
> >
> >
> > public class ProducerCallBackHandler implements Callback
> > {
> >
> >     Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class);
> >
> >     @Override
> >     public void onCompletion(RecordMetadata recordMetaData, Exception
> > exceptionOccured)
> >     {
> >        if(recordMetaData == null){
> >            LOG.info("error Occured while publishing event on Kafka");
> >            return;
> >        }
> >        if(exceptionOccured != null){
> >            LOG.info("Exception :{} Occured while publishing event on
> Kafka"
> > +exceptionOccured.getCause());
> >            return;
> >        }
> >
> >        LOG.info("Message published on kafka with offset : {} and
> partition
> > : {}",recordMetaData.offset(),recordMetaData.partition());
> >     }
> >
> > }
> >
> >
> >
> > I wrote a junit case to publish around 1000 message on the topic.
> >
> > public class KafkaProducerTest
> > {
> >
> >     @Test
> >     @Ignore
> >     public void ProducerBlockingTest()
> >     {
> >         KafkaProducer producer = new KafkaProducer();
> >         producer.initializeProperty();
> >         producer.initialize();
> >         for (int i = 0; i < 1000; i++) {
> >             producer.sendBlockingMessage("firstMessageKey",
> > "firstMessageValue");
> >         }
> >     }
> >
> >     @Test
> >     public void ProducerNonBlockingTest()
> >     {
> >         KafkaProducer producer = new KafkaProducer();
> >         producer.initializeProperty();
> >         producer.initialize();
> >         for (int i = 0; i < 1000; i++) {
> >             producer.sendMessage("firstMessageKey", "firstMessageValue");
> >         }
> >
> >     }
> >
> > }
> >
> >
> > when i saw my Logs, i shows call back handler was called till 48
> messages,
> > not after that
> >
> > [2015-02-17 01:30:26.068][kafka-producer-network-thread |
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > kafka with offset : 46 and  partition : 0
> > [2015-02-17 01:30:26.071][kafka-producer-network-thread |
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on
> > kafka with offset : 47 and  partition : 0
> > [2015-02-17 01:30:26.072][*kafka-producer-network-thread *|
> > ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on
> > kafka with offset : 48 and  partition : 0*
> >
> >
> > *Am I doing something wrong??*
> >
>

Reply via email to